diff --git a/docs/_advanced-topics/hooks.md b/docs/_advanced-topics/hooks.md index 060fe04f..0539beff 100644 --- a/docs/_advanced-topics/hooks.md +++ b/docs/_advanced-topics/hooks.md @@ -209,9 +209,17 @@ Below you can find an annotated, JSON-ish encoded example of a hook response: // When the filestore is used, the Path property defines where the uploaded file is saved. // The path may be absolute or relative, and point towards a location outside of the directory // defined using the `-dir` flag. If it's relative, the path will be resolved relative to `-dir`. - "Path": "./upload-e7a036dc-33f4-451f-9520-49032b87e952/presentation.pdf" + "Path": "./upload-e7a036dc-33f4-451f-9520-49032b87e952/presentation.pdf", - // Other storages, such as S3Store, GCSStore, and AzureStore, do not support the Storage + // When the S3 storage is used, the Key property defines the key under which the uploaded file is saved. + // If the key is not set, it defaults to using the upload ID as the key. In addition, the Bucket property + // defines the bucket where the uploaded file is saved. Note that the info and part files will still + // be stored in the bucket that the S3 storage has been configured for. + // Please consult https://tus.github.io/tusd/storage-backends/aws-s3/#custom-storage-location for more details. + "Key": "upload-e7a036dc-33f4-451f-9520-49032b87e952/presentation.pdf", + "Bucket": "customer-ABC" + + // Other storages, such as GCSStore, and AzureStore, do not support the Storage // property yet. } }, diff --git a/docs/_storage-backends/aws-s3.md b/docs/_storage-backends/aws-s3.md index 5494308e..f82c8d74 100644 --- a/docs/_storage-backends/aws-s3.md +++ b/docs/_storage-backends/aws-s3.md @@ -79,10 +79,38 @@ By default, the objects are stored at the root of the bucket. For example the ob - `abcdef123`: File object - `abcdef123.part`: Temporary object +For details on customizing the storage location, please read the next section below. + {: .note } The file object is not visible in the S3 bucket before the upload is finished because the transferred file data is stored in the associated S3 multipart upload. Once the upload is complete, the chunks from the S3 multipart are reassembled into the file, creating the file object and removing the S3 multipart upload. In addition, the S3 multipart upload is not directly visible in the S3 bucket because it does not represent a complete object. Please don't be confused if you don't see the changes in the bucket while the file is being uploaded. +### Custom storage location + +The locations of the three objects mentioned above can be fully customized using the [pre-create hook]({{ site.baseurl }}/advanced-topics/hooks/). The keys of the `.info` and `.part` objects are derived from the upload ID, which can be customized by the pre-create hook using the [`ChangeFileInfo.ID` setting]({{ site.baseurl }}/advanced-topics/hooks/#hook-requests-and-responses). Both objects will always be saved in the bucket configured in the `-s3-bucket` flag. Similarly, the location where the file object containing the uploaded data is saved is by default derived from the upload ID, but can be fully customized using the [`ChangeFileInfo.Storage.Key` and `ChangeFileInfo.Storage.Bucket` settings]({{ site.baseurl }}/advanced-topics/hooks/#hook-requests-and-responses). + +For example, consider that the pre-create hook returns the following hook response and tusd is configured with `-s3-bucket=upload-info`: + +```js +{ + "ChangeFileInfo": { + "ID": "project-123/abc", + "Storage": { + "Key": "project-123/abc/presentation.pdf", + "Bucket": "customer-ABC" + } + }, +} +``` + +Then the following objects will be created during the upload: + +- An informational object at `project-123/abc.info` in the `upload-info` bucket. +- A temporary object at `project-123/abc.part` in the `upload-info` bucket (if needed). +- The file object containing the uploaded data at `project-123/abc/presentation.pdf` in the `customer-ABC` bucket. + +If an object prefix with the `-s3-object-prefix` flag is configured, the prefix is prepended to the keys of all three objects. + ### Metadata If [metadata](https://tus.io/protocols/resumable-upload#upload-metadata) is associated with the upload during creation, it will be added to the file object once the upload is finished. Because the metadata on S3 objects must only contain ASCII characters, tusd will replace every non-ASCII character with a question mark. For example, "Menü" will become "Men?". diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index db4f29fd..d2d2c7e7 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -278,17 +278,23 @@ func (store S3Store) observeRequestDuration(start time.Time, label string) { } type s3Upload struct { - // objectId is the object key under which we save the final file - objectId string - // multipartId is the ID given by S3 to us for the multipart upload + // uploadId is the ID for the tus upload. Might be customized by pre-create hook. + // Its value does not include MetadataObjectPrefix nor ObjectPrefix. + uploadId string + // objectKey is the object key under which we save the final file. Might be customized by pre-create hook. + // Its value already includes ObjectPrefix, if that's set. + objectKey string + // objectBucket is the bucket in which the final file is saved. Might be customized by pre-create hook. + // The .info and .part files are always stored in `store.Bucket` + objectBucket string + // multipartId is the ID given by S3 to us for the multipart upload. multipartId string store *S3Store - // info stores the upload's current FileInfo struct. It may be nil if it hasn't - // been fetched yet from S3. Never read or write to it directly but instead use + // info stores the upload's current FileInfo struct. Never read or write to it directly but instead use // the GetInfo and writeInfo functions. - info *handler.FileInfo + info handler.FileInfo // parts collects all parts for this upload. It will be nil if info is nil as well. parts []*s3Part @@ -309,12 +315,29 @@ func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (hand return nil, fmt.Errorf("s3store: upload size of %v bytes exceeds MaxObjectSize of %v bytes", info.Size, store.MaxObjectSize) } - var objectId string if info.ID == "" { - objectId = uid.Uid() + info.ID = uid.Uid() + } + uploadId := info.ID + + // objectKey is the key under which the final object is stored. By default, + // it matches the upload ID but might also be customized by the pre-create hook. + // It must also include the optional object prefix. + var objectKey string + if info.Storage != nil && info.Storage["Key"] != "" { + objectKey = info.Storage["Key"] + } else { + objectKey = info.ID + } + objectKey = store.keyWithPrefix(objectKey) + + // Destination bucket can also be customized by pre-create. It's used for the multipart + // upload itself, while the info and part files always go into the bucket configured in the store. + var objectBucket string + if info.Storage != nil && info.Storage["Bucket"] != "" { + objectBucket = info.Storage["Bucket"] } else { - // certain tests set info.ID in advance - objectId = info.ID + objectBucket = store.Bucket } // Convert meta data into a map of pointers for AWS Go SDK, sigh. @@ -326,8 +349,8 @@ func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (hand // Create the actual multipart upload t := time.Now() res, err := store.Service.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ - Bucket: aws.String(store.Bucket), - Key: store.keyWithPrefix(objectId), + Bucket: aws.String(objectBucket), + Key: aws.String(objectKey), Metadata: metadata, }) store.observeRequestDuration(t, metricCreateMultipartUpload) @@ -336,15 +359,14 @@ func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (hand } multipartId := *res.UploadId - info.ID = objectId + "+" + multipartId - info.Storage = map[string]string{ - "Type": "s3store", - "Bucket": store.Bucket, - "Key": *store.keyWithPrefix(objectId), + "Type": "s3store", + "Bucket": objectBucket, + "Key": objectKey, + "MultipartUpload": multipartId, } - upload := &s3Upload{objectId, multipartId, &store, nil, []*s3Part{}, 0} + upload := &s3Upload{uploadId, objectKey, objectBucket, multipartId, &store, info, []*s3Part{}, 0} err = upload.writeInfo(ctx, info) if err != nil { return nil, fmt.Errorf("s3store: unable to create info file:\n%s", err) @@ -353,14 +375,29 @@ func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (hand return upload, nil } -func (store S3Store) GetUpload(ctx context.Context, id string) (handler.Upload, error) { - objectId, multipartId := splitIds(id) - if objectId == "" || multipartId == "" { - // If one of them is empty, it cannot be a valid ID. - return nil, handler.ErrNotFound +func (store S3Store) GetUpload(ctx context.Context, uploadId string) (handler.Upload, error) { + objectKey, objectBucket, multipartId, info, parts, incompletePartSize, err := store.fetchInfo(ctx, uploadId, nil) + if err != nil { + // Currently, s3store stores the multipart ID in the info object. However, in the past the + // multipart ID was part of the upload ID, which consisted of the object ID and multipart ID + // combined by a `+`. To maintain backwards compatibility with uploads there were created using + // previous tusd versions, we try to load an upload using the previous schema if we couldn't load + // an upload using the new schema. + // Note: LastIndex is used as the upload ID might also contain a plus sign on its own. + lastPlusIndex := strings.LastIndex(uploadId, "+") + if errors.Is(err, handler.ErrNotFound) && lastPlusIndex != -1 { + fallbackMultipartId := uploadId[lastPlusIndex+1:] + uploadId = uploadId[:lastPlusIndex] + objectKey, objectBucket, multipartId, info, parts, incompletePartSize, err = store.fetchInfo(ctx, uploadId, &fallbackMultipartId) + if err != nil { + return nil, err + } + } else { + return nil, err + } } - return &s3Upload{objectId, multipartId, &store, nil, []*s3Part{}, 0}, nil + return &s3Upload{uploadId, objectKey, objectBucket, multipartId, &store, info, parts, incompletePartSize}, nil } func (store S3Store) AsTerminatableUpload(upload handler.Upload) handler.TerminatableUpload { @@ -378,7 +415,7 @@ func (store S3Store) AsConcatableUpload(upload handler.Upload) handler.Concatabl func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) error { store := upload.store - upload.info = &info + upload.info = info infoJson, err := json.Marshal(info) if err != nil { @@ -389,7 +426,7 @@ func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) er t := time.Now() _, err = store.Service.PutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(store.Bucket), - Key: store.metadataKeyWithPrefix(upload.objectId + ".info"), + Key: store.metadataKeyWithPrefix(upload.uploadId + ".info"), Body: bytes.NewReader(infoJson), ContentLength: aws.Int64(int64(len(infoJson))), }) @@ -401,15 +438,10 @@ func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) er func (upload *s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) { store := upload.store - // Get the total size of the current upload, number of parts to generate next number and whether - // an incomplete part exists - _, _, incompletePartSize, err := upload.getInternalInfo(ctx) - if err != nil { - return 0, err - } - - if incompletePartSize > 0 { - incompletePartFile, err := store.downloadIncompletePartForUpload(ctx, upload.objectId) + // Check if we have an incomplete part that we must prepend. + var bytesFromIncompletePart int64 + if upload.incompletePartSize > 0 { + incompletePartFile, err := store.downloadIncompletePartForUpload(ctx, upload.uploadId) if err != nil { return 0, err } @@ -418,20 +450,24 @@ func (upload *s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Rea } defer cleanUpTempFile(incompletePartFile) - if err := store.deleteIncompletePartForUpload(ctx, upload.objectId); err != nil { + if err := store.deleteIncompletePartForUpload(ctx, upload.uploadId); err != nil { return 0, err } - // Prepend an incomplete part, if necessary and adapt the offset + // Reset `incompletePartSize` as we have just deleted the incomplete part. + bytesFromIncompletePart = upload.incompletePartSize + upload.incompletePartSize = 0 + + // Prepend an incomplete part and adapt the offset src = io.MultiReader(incompletePartFile, src) - offset = offset - incompletePartSize + offset = offset - bytesFromIncompletePart } bytesUploaded, err := upload.uploadParts(ctx, offset, src) // The size of the incomplete part should not be counted, because the // process of the incomplete part should be fully transparent to the user. - bytesUploaded = bytesUploaded - incompletePartSize + bytesUploaded = bytesUploaded - bytesFromIncompletePart if bytesUploaded < 0 { bytesUploaded = 0 } @@ -444,20 +480,14 @@ func (upload *s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Rea func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Reader) (int64, error) { store := upload.store - // Get the total size of the current upload and number of parts to generate next number - info, parts, _, err := upload.getInternalInfo(ctx) - if err != nil { - return 0, err - } - - size := info.Size + size := upload.info.Size bytesUploaded := int64(0) optimalPartSize, err := store.calcOptimalPartSize(size) if err != nil { return 0, err } - numParts := len(parts) + numParts := len(upload.parts) nextPartNum := int32(numParts + 1) partProducer, fileChan := newS3PartProducer(src, store.MaxBufferedParts, store.TemporaryDirectory, store.diskWriteDurationMetric) @@ -488,7 +518,7 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re partsize := fileChunk.size closePart := fileChunk.closeReader - isFinalChunk := !info.SizeIsDeferred && (size == offset+bytesUploaded+partsize) + isFinalChunk := !upload.info.SizeIsDeferred && (size == offset+bytesUploaded+partsize) if partsize >= store.MinPartSize || isFinalChunk { part := &s3Part{ etag: "", @@ -504,8 +534,8 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re t := time.Now() uploadPartInput := &s3.UploadPartInput{ - Bucket: aws.String(store.Bucket), - Key: store.keyWithPrefix(upload.objectId), + Bucket: aws.String(upload.objectBucket), + Key: aws.String(upload.objectKey), UploadId: aws.String(upload.multipartId), PartNumber: aws.Int32(part.number), } @@ -526,7 +556,7 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re defer upload.store.releaseUploadSemaphore() defer wg.Done() - if err := store.putIncompletePartForUpload(ctx, upload.objectId, file); err != nil { + if err := store.putIncompletePartForUpload(ctx, upload.uploadId, file); err != nil { uploadErr = err } if cerr := closePart(); cerr != nil && uploadErr == nil { @@ -608,82 +638,85 @@ func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s } func (upload *s3Upload) GetInfo(ctx context.Context) (info handler.FileInfo, err error) { - info, _, _, err = upload.getInternalInfo(ctx) - return info, err + return upload.info, nil } -func (upload *s3Upload) getInternalInfo(ctx context.Context) (info handler.FileInfo, parts []*s3Part, incompletePartSize int64, err error) { - if upload.info != nil { - return *upload.info, upload.parts, upload.incompletePartSize, nil +func (store S3Store) fetchInfo(ctx context.Context, uploadId string, fallbackMultipartId *string) (objectKey string, objectBucket string, multipartId string, info handler.FileInfo, parts []*s3Part, incompletePartSize int64, err error) { + // Start by fetching the file info stored in a separate object. + t := time.Now() + res, infoErr := store.Service.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(store.Bucket), + Key: store.metadataKeyWithPrefix(uploadId + ".info"), + }) + if infoErr != nil { + // If the info file is not found, we consider the upload to be non-existant + if isAwsError[*types.NoSuchKey](infoErr) { + err = handler.ErrNotFound + } else { + err = fmt.Errorf("s3store: failed to get info object: %w", infoErr) + } + return + } + store.observeRequestDuration(t, metricGetInfoObject) + + if jsonErr := json.NewDecoder(res.Body).Decode(&info); jsonErr != nil { + err = fmt.Errorf("s3store: failed to decode info object: %w", jsonErr) + return } - info, parts, incompletePartSize, err = upload.fetchInfo(ctx) - if err != nil { - return info, parts, incompletePartSize, err + if info.Storage != nil && info.Storage["Key"] != "" { + // Note: We do not need to add the prefix using store.keyWithPrefix because + // the value written by NewUpload already includes the prefix. + objectKey = info.Storage["Key"] + } else { + // If the info file does not include the object ID, fallback to using + // the upload ID, as was the default in earlier versions. + // Note: We add the prefix via store.keyWithPrefix because the ID does not include it. + objectKey = store.keyWithPrefix(uploadId) } - upload.info = &info - upload.parts = parts - upload.incompletePartSize = incompletePartSize - return info, parts, incompletePartSize, nil -} + if info.Storage != nil && info.Storage["Bucket"] != "" { + objectBucket = info.Storage["Bucket"] + } else { + objectBucket = store.Bucket + } -func (upload s3Upload) fetchInfo(ctx context.Context) (info handler.FileInfo, parts []*s3Part, incompletePartSize int64, err error) { - store := upload.store + if info.Storage != nil && info.Storage["MultipartUpload"] != "" { + multipartId = info.Storage["MultipartUpload"] + } else if fallbackMultipartId != nil { + // If the info file does not include the multipart ID, we try to use + // the provided fallback or fail entirely. + multipartId = *fallbackMultipartId + } else { + err = errors.New("s3store: upload is missing multipart ID and in invalid state") + } + // Here, we just found a info file. Now we can check the multipart upload and + // any incomplete part concurrently. We store all errors in here and handle + // them all together once the wait group is done. var wg sync.WaitGroup - wg.Add(3) - - // We store all errors in here and handle them all together once the wait - // group is done. - var infoErr error + wg.Add(2) var partsErr error var incompletePartSizeErr error - go func() { - defer wg.Done() - t := time.Now() - - // Get file info stored in separate object - var res *s3.GetObjectOutput - res, infoErr = store.Service.GetObject(ctx, &s3.GetObjectInput{ - Bucket: aws.String(store.Bucket), - Key: store.metadataKeyWithPrefix(upload.objectId + ".info"), - }) - store.observeRequestDuration(t, metricGetInfoObject) - if infoErr == nil { - infoErr = json.NewDecoder(res.Body).Decode(&info) - } - }() - go func() { defer wg.Done() // Get uploaded parts and their offset - parts, partsErr = store.listAllParts(ctx, upload.objectId, upload.multipartId) + parts, partsErr = store.listAllParts(ctx, objectKey, objectBucket, multipartId) }() go func() { defer wg.Done() // Get size of optional incomplete part file. - incompletePartSize, incompletePartSizeErr = store.headIncompletePartForUpload(ctx, upload.objectId) + incompletePartSize, incompletePartSizeErr = store.headIncompletePartForUpload(ctx, uploadId) }() wg.Wait() // Finally, after all requests are complete, let's handle the errors - if infoErr != nil { - err = infoErr - // If the info file is not found, we consider the upload to be non-existant - if isAwsError[*types.NoSuchKey](err) { - err = handler.ErrNotFound - } - return - } - if partsErr != nil { - err = partsErr // Check if the error is caused by the multipart upload not being found. This happens // when the multipart upload has already been completed or aborted. Since // we already found the info object, we know that the upload has been @@ -696,15 +729,17 @@ func (upload s3Upload) fetchInfo(ctx context.Context) (info handler.FileInfo, pa // See https://github.com/aws/aws-sdk-go-v2/issues/1635 // In addition, S3-compatible storages, like DigitalOcean Spaces, might cause // types.NoSuchKey to not be returned as well. - if isAwsError[*types.NoSuchUpload](err) || isAwsErrorCode(err, "NoSuchUpload") || isAwsError[*types.NoSuchKey](err) || isAwsErrorCode(err, "NoSuchKey") { + if isAwsError[*types.NoSuchUpload](partsErr) || isAwsErrorCode(partsErr, "NoSuchUpload") || isAwsError[*types.NoSuchKey](partsErr) || isAwsErrorCode(partsErr, "NoSuchKey") { info.Offset = info.Size err = nil + } else { + err = fmt.Errorf("s3store: failed to list parts: %w", partsErr) } return } if incompletePartSizeErr != nil { - err = incompletePartSizeErr + err = fmt.Errorf("s3store: failed to head incomplete part: %w", incompletePartSizeErr) return } @@ -716,67 +751,45 @@ func (upload s3Upload) fetchInfo(ctx context.Context) (info handler.FileInfo, pa info.Offset = offset - return info, parts, incompletePartSize, nil + return } func (upload s3Upload) GetReader(ctx context.Context) (io.ReadCloser, error) { - store := upload.store - - // Attempt to get upload content - res, err := store.Service.GetObject(ctx, &s3.GetObjectInput{ - Bucket: aws.String(store.Bucket), - Key: store.keyWithPrefix(upload.objectId), - }) - if err == nil { - // No error occurred, and we are able to stream the object - return res.Body, nil - } - - // If the file cannot be found, we ignore this error and continue since the - // upload may not have been finished yet. In this case we do not want to - // return a ErrNotFound but a more meaning-full message. - if !isAwsError[*types.NoSuchKey](err) { - return nil, err - } - - // Test whether the multipart upload exists to find out if the upload - // never existsted or just has not been finished yet - _, err = store.Service.ListParts(ctx, &s3.ListPartsInput{ - Bucket: aws.String(store.Bucket), - Key: store.keyWithPrefix(upload.objectId), - UploadId: aws.String(upload.multipartId), - MaxParts: aws.Int32(0), - }) - if err == nil { - // The multipart upload still exists, which means we cannot download it yet + // If the uplload is not yet complete, we cannot download the file. There is no way to retrieve + // the content of an incomplete multipart upload. + isComplete := !upload.info.SizeIsDeferred && upload.info.Offset == upload.info.Size + if !isComplete { return nil, handler.NewError("ERR_INCOMPLETE_UPLOAD", "cannot stream non-finished upload", http.StatusBadRequest) } - // The AWS Go SDK v2 has a bug where types.NoSuchUpload is not returned, - // so we also need to check the error code itself. - // See https://github.com/aws/aws-sdk-go-v2/issues/1635 - if isAwsError[*types.NoSuchUpload](err) || isAwsErrorCode(err, "NoSuchUpload") { - // Neither the object nor the multipart upload exists, so we return a 404 - return nil, handler.ErrNotFound + store := upload.store + res, err := store.Service.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(upload.objectBucket), + Key: aws.String(upload.objectKey), + }) + if err != nil { + // Note: We do not check for NoSuchKey here on purpose. If the object cannot be found + // but we expect it to be there, then we should error out with a 500, not a 404. + return nil, fmt.Errorf("s3store: failed to fetch object: %w", err) } - return nil, err + return res.Body, nil } func (upload s3Upload) Terminate(ctx context.Context) error { store := upload.store var wg sync.WaitGroup - wg.Add(2) - errs := make([]error, 0, 3) + wg.Add(3) + errs := make([]error, 0, 4) go func() { defer wg.Done() // Abort the multipart upload _, err := store.Service.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ - Bucket: aws.String(store.Bucket), - Key: store.keyWithPrefix(upload.objectId), + Bucket: aws.String(upload.objectBucket), + Key: aws.String(upload.objectKey), UploadId: aws.String(upload.multipartId), }) if err != nil && !isAwsError[*types.NoSuchUpload](err) { @@ -787,19 +800,34 @@ func (upload s3Upload) Terminate(ctx context.Context) error { go func() { defer wg.Done() - // Delete the info and content files + // Delete the object itself + _, err := store.Service.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(upload.objectBucket), + Key: aws.String(upload.objectKey), + }) + + if isAwsError[*types.NoSuchKey](err) || isAwsErrorCode(err, "NoSuchKey") { + err = nil + } + + if err != nil { + errs = append(errs, err) + } + }() + + go func() { + defer wg.Done() + + // Delete the info and part files res, err := store.Service.DeleteObjects(ctx, &s3.DeleteObjectsInput{ Bucket: aws.String(store.Bucket), Delete: &types.Delete{ Objects: []types.ObjectIdentifier{ { - Key: store.keyWithPrefix(upload.objectId), - }, - { - Key: store.metadataKeyWithPrefix(upload.objectId + ".part"), + Key: store.metadataKeyWithPrefix(upload.uploadId + ".part"), }, { - Key: store.metadataKeyWithPrefix(upload.objectId + ".info"), + Key: store.metadataKeyWithPrefix(upload.uploadId + ".info"), }, }, Quiet: aws.Bool(true), @@ -830,19 +858,13 @@ func (upload s3Upload) Terminate(ctx context.Context) error { func (upload s3Upload) FinishUpload(ctx context.Context) error { store := upload.store - // Get uploaded parts - _, parts, _, err := upload.getInternalInfo(ctx) - if err != nil { - return err - } - - if len(parts) == 0 { + if len(upload.parts) == 0 { // AWS expects at least one part to be present when completing the multipart // upload. So if the tus upload has a size of 0, we create an empty part // and use that for completing the multipart upload. res, err := store.Service.UploadPart(ctx, &s3.UploadPartInput{ - Bucket: aws.String(store.Bucket), - Key: store.keyWithPrefix(upload.objectId), + Bucket: aws.String(upload.objectBucket), + Key: aws.String(upload.objectKey), UploadId: aws.String(upload.multipartId), PartNumber: aws.Int32(1), Body: bytes.NewReader([]byte{}), @@ -851,7 +873,7 @@ func (upload s3Upload) FinishUpload(ctx context.Context) error { return err } - parts = []*s3Part{ + upload.parts = []*s3Part{ { etag: *res.ETag, number: 1, @@ -863,9 +885,9 @@ func (upload s3Upload) FinishUpload(ctx context.Context) error { // Transform the []*s3.Part slice to a []*s3.CompletedPart slice for the next // request. - completedParts := make([]types.CompletedPart, len(parts)) + completedParts := make([]types.CompletedPart, len(upload.parts)) - for index, part := range parts { + for index, part := range upload.parts { completedParts[index] = types.CompletedPart{ ETag: aws.String(part.etag), PartNumber: aws.Int32(part.number), @@ -873,9 +895,9 @@ func (upload s3Upload) FinishUpload(ctx context.Context) error { } t := time.Now() - _, err = store.Service.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ - Bucket: aws.String(store.Bucket), - Key: store.keyWithPrefix(upload.objectId), + _, err := store.Service.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(upload.objectBucket), + Key: aws.String(upload.objectKey), UploadId: aws.String(upload.multipartId), MultipartUpload: &types.CompletedMultipartUpload{ Parts: completedParts, @@ -925,8 +947,8 @@ func (upload *s3Upload) concatUsingDownload(ctx context.Context, partialUploads partialS3Upload := partialUpload.(*s3Upload) res, err := store.Service.GetObject(ctx, &s3.GetObjectInput{ - Bucket: aws.String(store.Bucket), - Key: store.keyWithPrefix(partialS3Upload.objectId), + Bucket: aws.String(partialS3Upload.objectBucket), + Key: aws.String(partialS3Upload.objectKey), }) if err != nil { return err @@ -943,8 +965,8 @@ func (upload *s3Upload) concatUsingDownload(ctx context.Context, partialUploads // Upload the entire file to S3 _, err = store.Service.PutObject(ctx, &s3.PutObjectInput{ - Bucket: aws.String(store.Bucket), - Key: store.keyWithPrefix(upload.objectId), + Bucket: aws.String(upload.objectBucket), + Key: aws.String(upload.objectKey), Body: file, }) if err != nil { @@ -957,8 +979,8 @@ func (upload *s3Upload) concatUsingDownload(ctx context.Context, partialUploads // the request. go func() { store.Service.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ - Bucket: aws.String(store.Bucket), - Key: store.keyWithPrefix(upload.objectId), + Bucket: aws.String(upload.objectBucket), + Key: aws.String(upload.objectKey), UploadId: aws.String(upload.multipartId), }) }() @@ -987,15 +1009,16 @@ func (upload *s3Upload) concatUsingMultipart(ctx context.Context, partialUploads etag: "", }) - go func(partNumber int32, sourceObject string) { + source := partialS3Upload.objectBucket + "/" + partialS3Upload.objectKey + go func(partNumber int32, source string) { defer wg.Done() res, err := store.Service.UploadPartCopy(ctx, &s3.UploadPartCopyInput{ - Bucket: aws.String(store.Bucket), - Key: store.keyWithPrefix(upload.objectId), + Bucket: aws.String(upload.objectBucket), + Key: aws.String(upload.objectKey), UploadId: aws.String(upload.multipartId), PartNumber: aws.Int32(partNumber), - CopySource: aws.String(store.Bucket + "/" + *store.keyWithPrefix(sourceObject)), + CopySource: aws.String(source), }) if err != nil { errs = append(errs, err) @@ -1003,7 +1026,7 @@ func (upload *s3Upload) concatUsingMultipart(ctx context.Context, partialUploads } upload.parts[partNumber-1].etag = *res.CopyPartResult.ETag - }(partNumber, partialS3Upload.objectId) + }(partNumber, source) } wg.Wait() @@ -1026,15 +1049,15 @@ func (upload *s3Upload) DeclareLength(ctx context.Context, length int64) error { return upload.writeInfo(ctx, info) } -func (store S3Store) listAllParts(ctx context.Context, objectId string, multipartId string) (parts []*s3Part, err error) { +func (store S3Store) listAllParts(ctx context.Context, objectKey string, objectBucket string, multipartId string) (parts []*s3Part, err error) { var partMarker *string for { t := time.Now() // Get uploaded parts listPtr, err := store.Service.ListParts(ctx, &s3.ListPartsInput{ - Bucket: aws.String(store.Bucket), - Key: store.keyWithPrefix(objectId), + Bucket: aws.String(objectBucket), + Key: aws.String(objectKey), UploadId: aws.String(multipartId), PartNumberMarker: partMarker, }) @@ -1147,19 +1170,6 @@ func (store S3Store) deleteIncompletePartForUpload(ctx context.Context, uploadId return err } -func splitIds(id string) (objectId, multipartId string) { - // We use LastIndex to allow plus signs in the object ID and assume that S3 will never - // returns multipart ID that incldues a plus sign. - index := strings.LastIndex(id, "+") - if index == -1 { - return - } - - objectId = id[:index] - multipartId = id[index+1:] - return -} - // isAwsError tests whether an error object is an instance of the AWS error // specified by its code. func isAwsError[T error](err error) bool { @@ -1219,13 +1229,13 @@ func (store S3Store) calcOptimalPartSize(size int64) (optimalPartSize int64, err return optimalPartSize, nil } -func (store S3Store) keyWithPrefix(key string) *string { +func (store S3Store) keyWithPrefix(key string) string { prefix := store.ObjectPrefix if prefix != "" && !strings.HasSuffix(prefix, "/") { prefix += "/" } - return aws.String(prefix + key) + return prefix + key } func (store S3Store) metadataKeyWithPrefix(key string) *string { diff --git a/pkg/s3store/s3store_test.go b/pkg/s3store/s3store_test.go index 2b3787f0..f4be6a6f 100644 --- a/pkg/s3store/s3store_test.go +++ b/pkg/s3store/s3store_test.go @@ -54,8 +54,8 @@ func TestNewUpload(t *testing.T) { s3obj.EXPECT().PutObject(context.Background(), &s3.PutObjectInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId.info"), - Body: bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"SizeIsDeferred":false,"Offset":0,"MetaData":{"bar":"menü\r\nhi","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId","Type":"s3store"}}`)), - ContentLength: aws.Int64(241), + Body: bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"SizeIsDeferred":false,"Offset":0,"MetaData":{"bar":"menü\r\nhi","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`)), + ContentLength: aws.Int64(261), }), ) @@ -73,97 +73,6 @@ func TestNewUpload(t *testing.T) { assert.NotNil(upload) } -func TestNewUploadWithObjectPrefix(t *testing.T) { - mockCtrl := gomock.NewController(t) - defer mockCtrl.Finish() - assert := assert.New(t) - - s3obj := NewMockS3API(mockCtrl) - store := New("bucket", s3obj) - store.ObjectPrefix = "my/uploaded/files" - - assert.Equal("bucket", store.Bucket) - assert.Equal(s3obj, store.Service) - - gomock.InOrder( - s3obj.EXPECT().CreateMultipartUpload(context.Background(), &s3.CreateMultipartUploadInput{ - Bucket: aws.String("bucket"), - Key: aws.String("my/uploaded/files/uploadId"), - Metadata: map[string]string{ - "foo": "hello", - "bar": "men?", - }, - }).Return(&s3.CreateMultipartUploadOutput{ - UploadId: aws.String("multipartId"), - }, nil), - s3obj.EXPECT().PutObject(context.Background(), &s3.PutObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("my/uploaded/files/uploadId.info"), - Body: bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"SizeIsDeferred":false,"Offset":0,"MetaData":{"bar":"menü","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"my/uploaded/files/uploadId","Type":"s3store"}}`)), - ContentLength: aws.Int64(253), - }), - ) - - info := handler.FileInfo{ - ID: "uploadId", - Size: 500, - MetaData: map[string]string{ - "foo": "hello", - "bar": "menü", - }, - } - - upload, err := store.NewUpload(context.Background(), info) - assert.Nil(err) - assert.NotNil(upload) -} - -func TestNewUploadWithMetadataObjectPrefix(t *testing.T) { - mockCtrl := gomock.NewController(t) - defer mockCtrl.Finish() - assert := assert.New(t) - - s3obj := NewMockS3API(mockCtrl) - store := New("bucket", s3obj) - store.ObjectPrefix = "my/uploaded/files" - store.MetadataObjectPrefix = "my/metadata" - - assert.Equal("bucket", store.Bucket) - assert.Equal(s3obj, store.Service) - - gomock.InOrder( - s3obj.EXPECT().CreateMultipartUpload(context.Background(), &s3.CreateMultipartUploadInput{ - Bucket: aws.String("bucket"), - Key: aws.String("my/uploaded/files/uploadId"), - Metadata: map[string]string{ - "foo": "hello", - "bar": "men?", - }, - }).Return(&s3.CreateMultipartUploadOutput{ - UploadId: aws.String("multipartId"), - }, nil), - s3obj.EXPECT().PutObject(context.Background(), &s3.PutObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("my/metadata/uploadId.info"), - Body: bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"SizeIsDeferred":false,"Offset":0,"MetaData":{"bar":"menü","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"my/uploaded/files/uploadId","Type":"s3store"}}`)), - ContentLength: aws.Int64(253), - }), - ) - - info := handler.FileInfo{ - ID: "uploadId", - Size: 500, - MetaData: map[string]string{ - "foo": "hello", - "bar": "menü", - }, - } - - upload, err := store.NewUpload(context.Background(), info) - assert.Nil(err) - assert.NotNil(upload) -} - // This test ensures that an newly created upload without any chunks can be // directly finished. There are no calls to ListPart or HeadObject because // the upload is not fetched from S3 first. @@ -177,7 +86,7 @@ func TestEmptyUpload(t *testing.T) { gomock.InOrder( s3obj.EXPECT().CreateMultipartUpload(context.Background(), &s3.CreateMultipartUploadInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("uploadId"), Metadata: map[string]string{}, }).Return(&s3.CreateMultipartUploadOutput{ @@ -186,11 +95,11 @@ func TestEmptyUpload(t *testing.T) { s3obj.EXPECT().PutObject(context.Background(), &s3.PutObjectInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId.info"), - Body: bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":0,"SizeIsDeferred":false,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId","Type":"s3store"}}`)), - ContentLength: aws.Int64(208), + Body: bytes.NewReader([]byte(`{"ID":"uploadId","Size":0,"SizeIsDeferred":false,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"custom-bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`)), + ContentLength: aws.Int64(235), }), s3obj.EXPECT().UploadPart(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumber: aws.Int32(1), @@ -199,7 +108,7 @@ func TestEmptyUpload(t *testing.T) { ETag: aws.String("etag"), }, nil), s3obj.EXPECT().CompleteMultipartUpload(context.Background(), &s3.CompleteMultipartUploadInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), MultipartUpload: &types.CompletedMultipartUpload{ @@ -216,6 +125,9 @@ func TestEmptyUpload(t *testing.T) { info := handler.FileInfo{ ID: "uploadId", Size: 0, + Storage: map[string]string{ + "Bucket": "custom-bucket", + }, } upload, err := store.NewUpload(context.Background(), info) @@ -260,22 +172,9 @@ func TestGetInfoNotFound(t *testing.T) { Key: aws.String("uploadId.info"), }).Return(nil, &types.NoSuchKey{}) - s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: nil, - }).Return(nil, &types.NoSuchUpload{}) - s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - }).Return(nil, &types.NoSuchKey{}) - - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") - assert.Nil(err) - - _, err = upload.GetInfo(context.Background()) + upload, err := store.GetUpload(context.Background(), "uploadId") assert.Equal(handler.ErrNotFound, err) + assert.Equal(nil, upload) } func TestGetInfo(t *testing.T) { @@ -290,11 +189,11 @@ func TestGetInfo(t *testing.T) { Bucket: aws.String("bucket"), Key: aws.String("uploadId.info"), }).Return(&s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"Offset":0,"MetaData":{"bar":"menü","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"my/uploaded/files/uploadId","Type":"s3store"}}`))), + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":{"bar":"menü","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"custom-bucket","Key":"my/uploaded/files/uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))), }, nil) s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), + Bucket: aws.String("custom-bucket"), + Key: aws.String("my/uploaded/files/uploadId"), UploadId: aws.String("multipartId"), PartNumberMarker: nil, }).Return(&s3.ListPartsOutput{ @@ -315,8 +214,8 @@ func TestGetInfo(t *testing.T) { IsTruncated: aws.Bool(true), }, nil) s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), + Bucket: aws.String("custom-bucket"), + Key: aws.String("my/uploaded/files/uploadId"), UploadId: aws.String("multipartId"), PartNumberMarker: aws.String("2"), }).Return(&s3.ListPartsOutput{ @@ -333,41 +232,121 @@ func TestGetInfo(t *testing.T) { Key: aws.String("uploadId.part"), }).Return(nil, &types.NoSuchKey{}) - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") + upload, err := store.GetUpload(context.Background(), "uploadId") assert.Nil(err) info, err := upload.GetInfo(context.Background()) assert.Nil(err) assert.Equal(int64(500), info.Size) assert.Equal(int64(400), info.Offset) - assert.Equal("uploadId+multipartId", info.ID) + assert.Equal("uploadId", info.ID) assert.Equal("hello", info.MetaData["foo"]) assert.Equal("menü", info.MetaData["bar"]) assert.Equal("s3store", info.Storage["Type"]) - assert.Equal("bucket", info.Storage["Bucket"]) + assert.Equal("custom-bucket", info.Storage["Bucket"]) assert.Equal("my/uploaded/files/uploadId", info.Storage["Key"]) + assert.Equal("multipartId", info.Storage["MultipartUpload"]) } -func TestGetInfoWithMetadataObjectPrefix(t *testing.T) { +func TestGetInfoWithIncompletePart(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() assert := assert.New(t) s3obj := NewMockS3API(mockCtrl) store := New("bucket", s3obj) - store.MetadataObjectPrefix = "my/metadata" s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ Bucket: aws.String("bucket"), - Key: aws.String("my/metadata/uploadId.info"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"custom-bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))), + }, nil) + s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("custom-bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: nil, + }).Return(&s3.ListPartsOutput{Parts: []types.Part{}}, nil) + s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.HeadObjectOutput{ + ContentLength: aws.Int64(10), + }, nil) + + upload, err := store.GetUpload(context.Background(), "uploadId") + assert.Nil(err) + + info, err := upload.GetInfo(context.Background()) + assert.Nil(err) + assert.Equal(int64(10), info.Offset) + assert.Equal("uploadId", info.ID) +} + +func TestGetInfoFinished(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + + s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), }).Return(&s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"Offset":0,"MetaData":{"bar":"menü","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"my/uploaded/files/uploadId","Type":"s3store"}}`))), + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))), }, nil) s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumberMarker: nil, + }).Return(nil, &types.NoSuchUpload{}) + s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(nil, &types.NoSuchKey{}) + + upload, err := store.GetUpload(context.Background(), "uploadId") + assert.Nil(err) + + info, err := upload.GetInfo(context.Background()) + assert.Nil(err) + assert.Equal(int64(500), info.Size) + assert.Equal(int64(500), info.Offset) +} + +// TestGetInfoWithOldIdFormat asserts that GetUpload falls back to extracting +// the multipart ID from the upload ID, if it's not found in the info object. +// This is done to be compatible with previous tusd versions. +// The upload ID includes an additional plus sign, which might have been set via +// a pre-create hook. The test ensures that this plus sign is properly treated. +func TestGetInfoWithOldIdFormat(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + + s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("upload+id+multipartId.info"), + }).Return(nil, &types.NoSuchKey{}) + + s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("upload+id.info"), + }).Return(&s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"upload+id+multipartId","Size":500,"Offset":0,"MetaData":{"bar":"menü","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"upload+id","Type":"s3store"}}`))), + }, nil) + s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("upload+id"), + UploadId: aws.String("multipartId"), + PartNumberMarker: nil, }).Return(&s3.ListPartsOutput{ Parts: []types.Part{ { @@ -387,7 +366,7 @@ func TestGetInfoWithMetadataObjectPrefix(t *testing.T) { }, nil) s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), + Key: aws.String("upload+id"), UploadId: aws.String("multipartId"), PartNumberMarker: aws.String("2"), }).Return(&s3.ListPartsOutput{ @@ -401,25 +380,25 @@ func TestGetInfoWithMetadataObjectPrefix(t *testing.T) { }, nil) s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{ Bucket: aws.String("bucket"), - Key: aws.String("my/metadata/uploadId.part"), + Key: aws.String("upload+id.part"), }).Return(nil, &types.NoSuchKey{}) - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") + upload, err := store.GetUpload(context.Background(), "upload+id+multipartId") assert.Nil(err) info, err := upload.GetInfo(context.Background()) assert.Nil(err) assert.Equal(int64(500), info.Size) assert.Equal(int64(400), info.Offset) - assert.Equal("uploadId+multipartId", info.ID) + assert.Equal("upload+id+multipartId", info.ID) assert.Equal("hello", info.MetaData["foo"]) assert.Equal("menü", info.MetaData["bar"]) assert.Equal("s3store", info.Storage["Type"]) assert.Equal("bucket", info.Storage["Bucket"]) - assert.Equal("my/uploaded/files/uploadId", info.Storage["Key"]) + assert.Equal("upload+id", info.Storage["Key"]) } -func TestGetInfoWithIncompletePart(t *testing.T) { +func TestGetReader(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() assert := assert.New(t) @@ -431,67 +410,34 @@ func TestGetInfoWithIncompletePart(t *testing.T) { Bucket: aws.String("bucket"), Key: aws.String("uploadId.info"), }).Return(&s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":12,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"custom-bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))), }, nil) s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumberMarker: nil, - }).Return(&s3.ListPartsOutput{Parts: []types.Part{}}, nil) + }).Return(nil, &types.NoSuchUpload{}) s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId.part"), - }).Return(&s3.HeadObjectOutput{ - ContentLength: aws.Int64(10), - }, nil) - - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") - assert.Nil(err) - - info, err := upload.GetInfo(context.Background()) - assert.Nil(err) - assert.Equal(int64(10), info.Offset) - assert.Equal("uploadId+multipartId", info.ID) -} - -func TestGetInfoFinished(t *testing.T) { - mockCtrl := gomock.NewController(t) - defer mockCtrl.Finish() - assert := assert.New(t) - - s3obj := NewMockS3API(mockCtrl) - store := New("bucket", s3obj) - + }).Return(nil, &types.NoSuchKey{}) s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.info"), + Bucket: aws.String("custom-bucket"), + Key: aws.String("uploadId"), }).Return(&s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), + Body: io.NopCloser(bytes.NewReader([]byte(`hello world`))), }, nil) - s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: nil, - }).Return(nil, &types.NoSuchUpload{}) - s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - }).Return(nil, &types.NoSuchKey{}) - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") + upload, err := store.GetUpload(context.Background(), "uploadId") assert.Nil(err) - info, err := upload.GetInfo(context.Background()) + content, err := upload.GetReader(context.Background()) assert.Nil(err) - assert.Equal(int64(500), info.Size) - assert.Equal(int64(500), info.Offset) + assert.Equal(io.NopCloser(bytes.NewReader([]byte(`hello world`))), content) } -// TestGetInfoWithPlusSign ensures that s3store can handle a plus sign in the object ID. -// Currently the plus sign is used to separate the object ID and multipart ID. -func TestGetInfoWithPlusSign(t *testing.T) { +func TestGetReaderNotFinished(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() assert := assert.New(t) @@ -499,31 +445,15 @@ func TestGetInfoWithPlusSign(t *testing.T) { s3obj := NewMockS3API(mockCtrl) store := New("bucket", s3obj) - gomock.InOrder( - s3obj.EXPECT().CreateMultipartUpload(context.Background(), &s3.CreateMultipartUploadInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId+something"), - Metadata: map[string]string{}, - }).Return(&s3.CreateMultipartUploadOutput{ - UploadId: aws.String("multipartId"), - }, nil), - s3obj.EXPECT().PutObject(context.Background(), &s3.PutObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId+something.info"), - Body: bytes.NewReader([]byte(`{"ID":"uploadId+something+multipartId","Size":500,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId+something","Type":"s3store"}}`)), - ContentLength: aws.Int64(228), - }), - ) - s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ Bucket: aws.String("bucket"), - Key: aws.String("uploadId+something.info"), + Key: aws.String("uploadId.info"), }).Return(&s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+something+multipartId","Size":500,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId+something","Type":"s3store"}}`))), + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":{"bar":"menü","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))), }, nil) s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ Bucket: aws.String("bucket"), - Key: aws.String("uploadId+something"), + Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumberMarker: nil, }).Return(&s3.ListPartsOutput{ @@ -543,31 +473,18 @@ func TestGetInfoWithPlusSign(t *testing.T) { }, nil) s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{ Bucket: aws.String("bucket"), - Key: aws.String("uploadId+something.part"), + Key: aws.String("uploadId.part"), }).Return(nil, &types.NoSuchKey{}) - info1 := handler.FileInfo{ - ID: "uploadId+something", - Size: 500, - MetaData: map[string]string{}, - } - - upload1, err := store.NewUpload(context.Background(), info1) - assert.Nil(err) - assert.NotNil(upload1) - - upload2, err := store.GetUpload(context.Background(), "uploadId+something+multipartId") + upload, err := store.GetUpload(context.Background(), "uploadId") assert.Nil(err) - info2, err := upload2.GetInfo(context.Background()) - assert.Nil(err) - assert.Equal(int64(500), info2.Size) - assert.Equal(int64(300), info2.Offset) - assert.Equal("uploadId+something+multipartId", info2.ID) - assert.Equal("uploadId+something", info2.Storage["Key"]) + content, err := upload.GetReader(context.Background()) + assert.Nil(content) + assert.Equal("ERR_INCOMPLETE_UPLOAD: cannot stream non-finished upload", err.Error()) } -func TestGetReader(t *testing.T) { +func TestDeclareLength(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() assert := assert.New(t) @@ -577,95 +494,12 @@ func TestGetReader(t *testing.T) { s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), + Key: aws.String("uploadId.info"), }).Return(&s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte(`hello world`))), - }, nil) - - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") - assert.Nil(err) - - content, err := upload.GetReader(context.Background()) - assert.Nil(err) - assert.Equal(io.NopCloser(bytes.NewReader([]byte(`hello world`))), content) -} - -func TestGetReaderNotFound(t *testing.T) { - mockCtrl := gomock.NewController(t) - defer mockCtrl.Finish() - assert := assert.New(t) - - s3obj := NewMockS3API(mockCtrl) - store := New("bucket", s3obj) - - gomock.InOrder( - s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - }).Return(nil, &types.NoSuchKey{}), - s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - MaxParts: aws.Int32(0), - }).Return(nil, &types.NoSuchUpload{}), - ) - - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") - assert.Nil(err) - - content, err := upload.GetReader(context.Background()) - assert.Nil(content) - assert.Equal(handler.ErrNotFound, err) -} - -func TestGetReaderNotFinished(t *testing.T) { - mockCtrl := gomock.NewController(t) - defer mockCtrl.Finish() - assert := assert.New(t) - - s3obj := NewMockS3API(mockCtrl) - store := New("bucket", s3obj) - - gomock.InOrder( - s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - }).Return(nil, &types.NoSuchKey{}), - s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - MaxParts: aws.Int32(0), - }).Return(&s3.ListPartsOutput{ - Parts: []types.Part{}, - }, nil), - ) - - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") - assert.Nil(err) - - content, err := upload.GetReader(context.Background()) - assert.Nil(content) - assert.Equal("ERR_INCOMPLETE_UPLOAD: cannot stream non-finished upload", err.Error()) -} - -func TestDeclareLength(t *testing.T) { - mockCtrl := gomock.NewController(t) - defer mockCtrl.Finish() - assert := assert.New(t) - - s3obj := NewMockS3API(mockCtrl) - store := New("bucket", s3obj) - - s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.info"), - }).Return(&s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":0,"SizeIsDeferred":true,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId","Type":"s3store"}}`))), + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":0,"SizeIsDeferred":true,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"custom-bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))), }, nil) s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumberMarker: nil, @@ -679,11 +513,11 @@ func TestDeclareLength(t *testing.T) { s3obj.EXPECT().PutObject(context.Background(), &s3.PutObjectInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId.info"), - Body: bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId","Type":"s3store"}}`)), - ContentLength: aws.Int64(208), + Body: bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"custom-bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`)), + ContentLength: aws.Int64(235), }) - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") + upload, err := store.GetUpload(context.Background(), "uploadId") assert.Nil(err) err = store.AsLengthDeclarableUpload(upload).DeclareLength(context.Background(), 500) @@ -705,10 +539,10 @@ func TestFinishUpload(t *testing.T) { Bucket: aws.String("bucket"), Key: aws.String("uploadId.info"), }).Return(&s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":400,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":400,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"custom-bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))), }, nil) s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumberMarker: nil, @@ -729,7 +563,7 @@ func TestFinishUpload(t *testing.T) { IsTruncated: aws.Bool(true), }, nil) s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumberMarker: aws.String("2"), @@ -747,7 +581,7 @@ func TestFinishUpload(t *testing.T) { Key: aws.String("uploadId.part"), }).Return(nil, &types.NotFound{}) s3obj.EXPECT().CompleteMultipartUpload(context.Background(), &s3.CompleteMultipartUploadInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), MultipartUpload: &types.CompletedMultipartUpload{ @@ -768,7 +602,7 @@ func TestFinishUpload(t *testing.T) { }, }).Return(nil, nil) - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") + upload, err := store.GetUpload(context.Background(), "uploadId") assert.Nil(err) err = upload.FinishUpload(context.Background()) @@ -793,10 +627,10 @@ func TestWriteChunk(t *testing.T) { Bucket: aws.String("bucket"), Key: aws.String("uploadId.info"), }).Return(&s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"custom-bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))), }, nil) s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumberMarker: nil, @@ -821,7 +655,7 @@ func TestWriteChunk(t *testing.T) { // From WriteChunk s3obj.EXPECT().UploadPart(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumber: aws.Int32(3), @@ -830,7 +664,7 @@ func TestWriteChunk(t *testing.T) { ETag: aws.String("etag-3"), }, nil) s3obj.EXPECT().UploadPart(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumber: aws.Int32(4), @@ -839,7 +673,7 @@ func TestWriteChunk(t *testing.T) { ETag: aws.String("etag-4"), }, nil) s3obj.EXPECT().UploadPart(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumber: aws.Int32(5), @@ -853,7 +687,7 @@ func TestWriteChunk(t *testing.T) { Body: bytes.NewReader([]byte("CD")), })).Return(nil, nil) - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") + upload, err := store.GetUpload(context.Background(), "uploadId") assert.Nil(err) bytesRead, err := upload.WriteChunk(context.Background(), 300, bytes.NewReader([]byte("1234567890ABCD"))) @@ -873,7 +707,7 @@ func TestWriteChunkWriteIncompletePartBecauseTooSmall(t *testing.T) { Bucket: aws.String("bucket"), Key: aws.String("uploadId.info"), }).Return(&s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))), }, nil) s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ Bucket: aws.String("bucket"), @@ -905,7 +739,7 @@ func TestWriteChunkWriteIncompletePartBecauseTooSmall(t *testing.T) { Body: bytes.NewReader([]byte("1234567890")), })).Return(nil, nil) - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") + upload, err := store.GetUpload(context.Background(), "uploadId") assert.Nil(err) bytesRead, err := upload.WriteChunk(context.Background(), 300, bytes.NewReader([]byte("1234567890"))) @@ -930,10 +764,10 @@ func TestWriteChunkPrependsIncompletePart(t *testing.T) { Bucket: aws.String("bucket"), Key: aws.String("uploadId.info"), }).Return(&s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":5,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":5,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"custom-bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))), }, nil) s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumberMarker: nil, @@ -959,7 +793,7 @@ func TestWriteChunkPrependsIncompletePart(t *testing.T) { }).Return(&s3.DeleteObjectOutput{}, nil) s3obj.EXPECT().UploadPart(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumber: aws.Int32(1), @@ -968,7 +802,7 @@ func TestWriteChunkPrependsIncompletePart(t *testing.T) { ETag: aws.String("etag-1"), }, nil) s3obj.EXPECT().UploadPart(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumber: aws.Int32(2), @@ -977,7 +811,7 @@ func TestWriteChunkPrependsIncompletePart(t *testing.T) { ETag: aws.String("etag-2"), }, nil) - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") + upload, err := store.GetUpload(context.Background(), "uploadId") assert.Nil(err) bytesRead, err := upload.WriteChunk(context.Background(), 3, bytes.NewReader([]byte("45"))) @@ -1002,10 +836,10 @@ func TestWriteChunkPrependsIncompletePartAndWritesANewIncompletePart(t *testing. Bucket: aws.String("bucket"), Key: aws.String("uploadId.info"), }).Return(&s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":10,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":10,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"custom-bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))), }, nil) s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumberMarker: nil, @@ -1029,7 +863,7 @@ func TestWriteChunkPrependsIncompletePartAndWritesANewIncompletePart(t *testing. }).Return(&s3.DeleteObjectOutput{}, nil) s3obj.EXPECT().UploadPart(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumber: aws.Int32(1), @@ -1043,7 +877,7 @@ func TestWriteChunkPrependsIncompletePartAndWritesANewIncompletePart(t *testing. Body: bytes.NewReader([]byte("5")), })).Return(nil, nil) - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") + upload, err := store.GetUpload(context.Background(), "uploadId") assert.Nil(err) bytesRead, err := upload.WriteChunk(context.Background(), 3, bytes.NewReader([]byte("45"))) @@ -1064,10 +898,10 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) { Bucket: aws.String("bucket"), Key: aws.String("uploadId.info"), }).Return(&s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"custom-bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))), }, nil) s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumberMarker: nil, @@ -1090,7 +924,7 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) { Key: aws.String("uploadId.part"), }).Return(nil, &smithy.GenericAPIError{Code: "AccessDenied", Message: "Access Denied."}) s3obj.EXPECT().UploadPart(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumber: aws.Int32(3), @@ -1099,7 +933,7 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) { ETag: aws.String("etag-3"), }, nil) - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") + upload, err := store.GetUpload(context.Background(), "uploadId") assert.Nil(err) // 10 bytes are missing for the upload to be finished (offset at 490 for 500 @@ -1118,20 +952,52 @@ func TestTerminate(t *testing.T) { s3obj := NewMockS3API(mockCtrl) store := New("bucket", s3obj) - // Order is not important in this situation. + s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":{"bar":"menü","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"custom-bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))), + }, nil) + s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("custom-bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: nil, + }).Return(&s3.ListPartsOutput{ + Parts: []types.Part{ + { + PartNumber: aws.Int32(1), + Size: aws.Int64(100), + ETag: aws.String("etag-1"), + }, + { + PartNumber: aws.Int32(2), + Size: aws.Int64(200), + ETag: aws.String("etag-2"), + }, + }, + IsTruncated: aws.Bool(false), + }, nil) + s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(nil, &types.NoSuchKey{}) + s3obj.EXPECT().AbortMultipartUpload(context.Background(), &s3.AbortMultipartUploadInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), }).Return(nil, nil) + s3obj.EXPECT().DeleteObject(context.Background(), &s3.DeleteObjectInput{ + Bucket: aws.String("custom-bucket"), + Key: aws.String("uploadId"), + }).Return(&s3.DeleteObjectOutput{}, nil) + s3obj.EXPECT().DeleteObjects(context.Background(), &s3.DeleteObjectsInput{ Bucket: aws.String("bucket"), Delete: &types.Delete{ Objects: []types.ObjectIdentifier{ - { - Key: aws.String("uploadId"), - }, { Key: aws.String("uploadId.part"), }, @@ -1143,7 +1009,7 @@ func TestTerminate(t *testing.T) { }, }).Return(&s3.DeleteObjectsOutput{}, nil) - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") + upload, err := store.GetUpload(context.Background(), "uploadId") assert.Nil(err) err = store.AsTerminatableUpload(upload).Terminate(context.Background()) @@ -1158,21 +1024,53 @@ func TestTerminateWithErrors(t *testing.T) { s3obj := NewMockS3API(mockCtrl) store := New("bucket", s3obj) - // Order is not important in this situation. - // NoSuchUpload errors should be ignored + s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":{"bar":"menü","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"custom-bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))), + }, nil) + s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("custom-bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: nil, + }).Return(&s3.ListPartsOutput{ + Parts: []types.Part{ + { + PartNumber: aws.Int32(1), + Size: aws.Int64(100), + ETag: aws.String("etag-1"), + }, + { + PartNumber: aws.Int32(2), + Size: aws.Int64(200), + ETag: aws.String("etag-2"), + }, + }, + IsTruncated: aws.Bool(false), + }, nil) + s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(nil, &types.NoSuchKey{}) + + // These NoSuchUpload and NoSuchKey errors should be ignored s3obj.EXPECT().AbortMultipartUpload(context.Background(), &s3.AbortMultipartUploadInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), }).Return(nil, &types.NoSuchUpload{}) + s3obj.EXPECT().DeleteObject(context.Background(), &s3.DeleteObjectInput{ + Bucket: aws.String("custom-bucket"), + Key: aws.String("uploadId"), + }).Return(nil, &types.NoSuchKey{}) + s3obj.EXPECT().DeleteObjects(context.Background(), &s3.DeleteObjectsInput{ Bucket: aws.String("bucket"), Delete: &types.Delete{ Objects: []types.ObjectIdentifier{ - { - Key: aws.String("uploadId"), - }, { Key: aws.String("uploadId.part"), }, @@ -1189,10 +1087,14 @@ func TestTerminateWithErrors(t *testing.T) { Key: aws.String("uploadId"), Message: aws.String("it's me."), }, + { + Code: aws.String("NoSuchKey"), + Key: aws.String("uploadId.part"), + }, }, }, nil) - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") + upload, err := store.GetUpload(context.Background(), "uploadId") assert.Nil(err) err = store.AsTerminatableUpload(upload).Terminate(context.Background()) @@ -1206,11 +1108,12 @@ func TestConcatUploadsUsingMultipart(t *testing.T) { s3obj := NewMockS3API(mockCtrl) store := New("bucket", s3obj) + // All partial uploads have a size (500) larger than the MinPartSize, so a S3 Multipart Upload is used for concatenation. store.MinPartSize = 100 // Calls from NewUpload s3obj.EXPECT().CreateMultipartUpload(context.Background(), &s3.CreateMultipartUploadInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket-1"), Key: aws.String("uploadId"), Metadata: map[string]string{}, }).Return(&s3.CreateMultipartUploadOutput{ @@ -1219,16 +1122,36 @@ func TestConcatUploadsUsingMultipart(t *testing.T) { s3obj.EXPECT().PutObject(context.Background(), &s3.PutObjectInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId.info"), - Body: bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":0,"SizeIsDeferred":false,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":true,"PartialUploads":["aaa+AAA","bbb+BBB","ccc+CCC"],"Storage":{"Bucket":"bucket","Key":"uploadId","Type":"s3store"}}`)), - ContentLength: aws.Int64(234), + Body: bytes.NewReader([]byte(`{"ID":"uploadId","Size":1500,"SizeIsDeferred":false,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":true,"PartialUploads":["uploadA","uploadB","uploadC"],"Storage":{"Bucket":"custom-bucket-1","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`)), + ContentLength: aws.Int64(266), }) + // Calls from GetUpload + for _, id := range []string{"uploadA", "uploadB", "uploadC"} { + s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String(id + ".info"), + }).Return(&s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"` + id + `","Size":500,"Offset":0,"MetaData":null,"IsPartial":true,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"custom-bucket-2","Key":"` + id + `","MultipartUpload":"multipart` + id + `","Type":"s3store"}}`))), + }, nil) + s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("custom-bucket-2"), + Key: aws.String(id), + UploadId: aws.String("multipart" + id), + PartNumberMarker: nil, + }).Return(nil, &types.NoSuchUpload{}) + s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String(id + ".part"), + }).Return(nil, &types.NoSuchKey{}) + } + // Calls from ConcatUploads s3obj.EXPECT().UploadPartCopy(context.Background(), &s3.UploadPartCopyInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket-1"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), - CopySource: aws.String("bucket/aaa"), + CopySource: aws.String("custom-bucket-2/uploadA"), PartNumber: aws.Int32(1), }).Return(&s3.UploadPartCopyOutput{ CopyPartResult: &types.CopyPartResult{ @@ -1237,10 +1160,10 @@ func TestConcatUploadsUsingMultipart(t *testing.T) { }, nil) s3obj.EXPECT().UploadPartCopy(context.Background(), &s3.UploadPartCopyInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket-1"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), - CopySource: aws.String("bucket/bbb"), + CopySource: aws.String("custom-bucket-2/uploadB"), PartNumber: aws.Int32(2), }).Return(&s3.UploadPartCopyOutput{ CopyPartResult: &types.CopyPartResult{ @@ -1249,10 +1172,10 @@ func TestConcatUploadsUsingMultipart(t *testing.T) { }, nil) s3obj.EXPECT().UploadPartCopy(context.Background(), &s3.UploadPartCopyInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket-1"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), - CopySource: aws.String("bucket/ccc"), + CopySource: aws.String("custom-bucket-2/uploadC"), PartNumber: aws.Int32(3), }).Return(&s3.UploadPartCopyOutput{ CopyPartResult: &types.CopyPartResult{ @@ -1262,7 +1185,7 @@ func TestConcatUploadsUsingMultipart(t *testing.T) { // Calls from FinishUpload s3obj.EXPECT().CompleteMultipartUpload(context.Background(), &s3.CompleteMultipartUploadInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket-1"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), MultipartUpload: &types.CompletedMultipartUpload{ @@ -1285,28 +1208,27 @@ func TestConcatUploadsUsingMultipart(t *testing.T) { info := handler.FileInfo{ ID: "uploadId", + Size: 1500, IsFinal: true, PartialUploads: []string{ - "aaa+AAA", - "bbb+BBB", - "ccc+CCC", + "uploadA", + "uploadB", + "uploadC", + }, + Storage: map[string]string{ + "Bucket": "custom-bucket-1", }, } upload, err := store.NewUpload(context.Background(), info) assert.Nil(err) - uploadA, err := store.GetUpload(context.Background(), "aaa+AAA") + uploadA, err := store.GetUpload(context.Background(), "uploadA") assert.Nil(err) - uploadB, err := store.GetUpload(context.Background(), "bbb+BBB") + uploadB, err := store.GetUpload(context.Background(), "uploadB") assert.Nil(err) - uploadC, err := store.GetUpload(context.Background(), "ccc+CCC") + uploadC, err := store.GetUpload(context.Background(), "uploadC") assert.Nil(err) - // All uploads have a size larger than the MinPartSize, so a S3 Multipart Upload is used for concatenation. - uploadA.(*s3Upload).info = &handler.FileInfo{Size: 500} - uploadB.(*s3Upload).info = &handler.FileInfo{Size: 500} - uploadC.(*s3Upload).info = &handler.FileInfo{Size: 500} - err = store.AsConcatableUpload(upload).ConcatUploads(context.Background(), []handler.Upload{ uploadA, uploadB, @@ -1322,54 +1244,90 @@ func TestConcatUploadsUsingDownload(t *testing.T) { s3obj := NewMockS3API(mockCtrl) store := New("bucket", s3obj) + // All partial uploads have a size (3, 4, 5) smaller than the MinPartSize, so the files are downloaded for concatenation. store.MinPartSize = 100 - gomock.InOrder( - s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("aaa"), - }).Return(&s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte("aaa"))), - }, nil), - s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("bbb"), - }).Return(&s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte("bbbb"))), - }, nil), + // Calls from GetUpload for final upload + s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":12,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":true,"PartialUploads":["uploadA","uploadB","uploadC"],"Storage":{"Bucket":"custom-bucket-1","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))), + }, nil) + s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("custom-bucket-1"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: nil, + }).Return(&s3.ListPartsOutput{ + Parts: []types.Part{}, + IsTruncated: aws.Bool(false), + }, nil) + s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(nil, &types.NoSuchKey{}) + + // Calls from GetUpload for partial uploads + for id, size := range map[string]string{"uploadA": "3", "uploadB": "4", "uploadC": "5"} { s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ Bucket: aws.String("bucket"), - Key: aws.String("ccc"), + Key: aws.String(id + ".info"), }).Return(&s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte("ccccc"))), - }, nil), - s3obj.EXPECT().PutObject(context.Background(), NewPutObjectInputMatcher(&s3.PutObjectInput{ + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"` + id + `","Size":` + size + `,"Offset":0,"MetaData":null,"IsPartial":true,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"custom-bucket-2","Key":"` + id + `","MultipartUpload":"multipart` + id + `","Type":"s3store"}}`))), + }, nil) + s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("custom-bucket-2"), + Key: aws.String(id), + UploadId: aws.String("multipart" + id), + PartNumberMarker: nil, + }).Return(nil, &types.NoSuchUpload{}) + s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{ Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - Body: bytes.NewReader([]byte("aaabbbbccccc")), - })), - s3obj.EXPECT().AbortMultipartUpload(context.Background(), &s3.AbortMultipartUploadInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - }).Return(nil, nil), - ) + Key: aws.String(id + ".part"), + }).Return(nil, &types.NoSuchKey{}) + } - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") + // Calls from ConcatUploads + s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("custom-bucket-2"), + Key: aws.String("uploadA"), + }).Return(&s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("aaa"))), + }, nil) + s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("custom-bucket-2"), + Key: aws.String("uploadB"), + }).Return(&s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("bbbb"))), + }, nil) + s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("custom-bucket-2"), + Key: aws.String("uploadC"), + }).Return(&s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("ccccc"))), + }, nil) + s3obj.EXPECT().PutObject(context.Background(), NewPutObjectInputMatcher(&s3.PutObjectInput{ + Bucket: aws.String("custom-bucket-1"), + Key: aws.String("uploadId"), + Body: bytes.NewReader([]byte("aaabbbbccccc")), + })) + s3obj.EXPECT().AbortMultipartUpload(context.Background(), &s3.AbortMultipartUploadInput{ + Bucket: aws.String("custom-bucket-1"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + }).Return(nil, nil) + + upload, err := store.GetUpload(context.Background(), "uploadId") assert.Nil(err) - uploadA, err := store.GetUpload(context.Background(), "aaa+AAA") + uploadA, err := store.GetUpload(context.Background(), "uploadA") assert.Nil(err) - uploadB, err := store.GetUpload(context.Background(), "bbb+BBB") + uploadB, err := store.GetUpload(context.Background(), "uploadB") assert.Nil(err) - uploadC, err := store.GetUpload(context.Background(), "ccc+CCC") + uploadC, err := store.GetUpload(context.Background(), "uploadC") assert.Nil(err) - // All uploads have a size smaller than the MinPartSize, so the files are downloaded for concatenation. - uploadA.(*s3Upload).info = &handler.FileInfo{Size: 3} - uploadB.(*s3Upload).info = &handler.FileInfo{Size: 4} - uploadC.(*s3Upload).info = &handler.FileInfo{Size: 5} - err = store.AsConcatableUpload(upload).ConcatUploads(context.Background(), []handler.Upload{ uploadA, uploadB, @@ -1439,7 +1397,7 @@ func TestWriteChunkCleansUpTempFiles(t *testing.T) { Bucket: aws.String("bucket"), Key: aws.String("uploadId.info"), }).Return(&s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":14,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":14,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))), }, nil) s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ Bucket: aws.String("bucket"), @@ -1456,7 +1414,7 @@ func TestWriteChunkCleansUpTempFiles(t *testing.T) { // No calls to s3obj.EXPECT().UploadPart since that is handled by s3APIWithTempFileAssertion - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") + upload, err := store.GetUpload(context.Background(), "uploadId") assert.Nil(err) bytesRead, err := upload.WriteChunk(context.Background(), 0, bytes.NewReader([]byte("1234567890ABCD"))) @@ -1468,3 +1426,422 @@ func TestWriteChunkCleansUpTempFiles(t *testing.T) { assert.Nil(err) assert.Equal(len(files), 0) } + +// TestObjectPrefix asserts an entire upload flow when ObjectPrefix is set, +// including creating, resuming and finishing an upload. +func TestObjectPrefix(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + store.ObjectPrefix = "my/uploaded/files" + store.MinPartSize = 1 + + assert.Equal("bucket", store.Bucket) + assert.Equal(s3obj, store.Service) + + // For NewUpload + s3obj.EXPECT().CreateMultipartUpload(context.Background(), &s3.CreateMultipartUploadInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/uploaded/files/uploadId"), + Metadata: map[string]string{}, + }).Return(&s3.CreateMultipartUploadOutput{ + UploadId: aws.String("multipartId"), + }, nil) + s3obj.EXPECT().PutObject(context.Background(), &s3.PutObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/uploaded/files/uploadId.info"), + Body: bytes.NewReader([]byte(`{"ID":"uploadId","Size":11,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"my/uploaded/files/uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`)), + ContentLength: aws.Int64(245), + }) + + // For WriteChunk + s3obj.EXPECT().UploadPart(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/uploaded/files/uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int32(1), + Body: bytes.NewReader([]byte("hello ")), + })).Return(&s3.UploadPartOutput{ + ETag: aws.String("etag-1"), + }, nil) + + // For GetUpload + s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/uploaded/files/uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":11,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"my/uploaded/files/uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))), + }, nil) + s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/uploaded/files/uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: nil, + }).Return(&s3.ListPartsOutput{ + Parts: []types.Part{ + { + PartNumber: aws.Int32(1), + Size: aws.Int64(6), + ETag: aws.String("etag-1"), + }, + }, + IsTruncated: aws.Bool(false), + }, nil) + s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/uploaded/files/uploadId.part"), + }).Return(nil, &types.NoSuchKey{}) + + // For WriteChunk + s3obj.EXPECT().UploadPart(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/uploaded/files/uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int32(2), + Body: bytes.NewReader([]byte("world")), + })).Return(&s3.UploadPartOutput{ + ETag: aws.String("etag-2"), + }, nil) + + // For FinishUpload + s3obj.EXPECT().CompleteMultipartUpload(context.Background(), &s3.CompleteMultipartUploadInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/uploaded/files/uploadId"), + UploadId: aws.String("multipartId"), + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: []types.CompletedPart{ + { + ETag: aws.String("etag-1"), + PartNumber: aws.Int32(1), + }, + { + ETag: aws.String("etag-2"), + PartNumber: aws.Int32(2), + }, + }, + }, + }).Return(nil, nil) + + info1 := handler.FileInfo{ + ID: "uploadId", + Size: 11, + MetaData: map[string]string{}, + } + + // 1. Create upload + upload1, err := store.NewUpload(context.Background(), info1) + assert.Nil(err) + assert.NotNil(upload1) + + // 2. Write first chunk + bytesRead, err := upload1.WriteChunk(context.Background(), 0, bytes.NewReader([]byte("hello "))) + assert.Nil(err) + assert.Equal(int64(6), bytesRead) + + // 3. Fetch upload again + upload2, err := store.GetUpload(context.Background(), "uploadId") + assert.Nil(err) + assert.NotNil(upload2) + + // 4. Retrieve upload state + info2, err := upload2.GetInfo(context.Background()) + assert.Nil(err) + assert.Equal(int64(11), info2.Size) + assert.Equal(int64(6), info2.Offset) + assert.Equal("uploadId", info2.ID) + assert.Equal("my/uploaded/files/uploadId", info2.Storage["Key"]) + assert.Equal("multipartId", info2.Storage["MultipartUpload"]) + + // 5. Write second chunk + bytesRead, err = upload2.WriteChunk(context.Background(), 6, bytes.NewReader([]byte("world"))) + assert.Nil(err) + assert.Equal(int64(5), bytesRead) + + // 6. Complete upload + err = upload2.FinishUpload(context.Background()) + assert.Nil(err) +} + +// TestMetadataObjectPrefix asserts an entire upload flow when ObjectPrefix +// and MetadataObjectPrefix are set, including creating, resuming and finishing an upload. +func TestMetadataObjectPrefix(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + store.ObjectPrefix = "my/uploaded/files" + store.MetadataObjectPrefix = "my/metadata" + store.MinPartSize = 1 + + assert.Equal("bucket", store.Bucket) + assert.Equal(s3obj, store.Service) + + // For NewUpload + s3obj.EXPECT().CreateMultipartUpload(context.Background(), &s3.CreateMultipartUploadInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/uploaded/files/uploadId"), + Metadata: map[string]string{}, + }).Return(&s3.CreateMultipartUploadOutput{ + UploadId: aws.String("multipartId"), + }, nil) + s3obj.EXPECT().PutObject(context.Background(), &s3.PutObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/metadata/uploadId.info"), + Body: bytes.NewReader([]byte(`{"ID":"uploadId","Size":11,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"my/uploaded/files/uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`)), + ContentLength: aws.Int64(245), + }) + + // For WriteChunk + s3obj.EXPECT().UploadPart(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/uploaded/files/uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int32(1), + Body: bytes.NewReader([]byte("hello ")), + })).Return(&s3.UploadPartOutput{ + ETag: aws.String("etag-1"), + }, nil) + + // For GetUpload + s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/metadata/uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":11,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"my/uploaded/files/uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))), + }, nil) + s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/uploaded/files/uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: nil, + }).Return(&s3.ListPartsOutput{ + Parts: []types.Part{ + { + PartNumber: aws.Int32(1), + Size: aws.Int64(6), + ETag: aws.String("etag-1"), + }, + }, + IsTruncated: aws.Bool(false), + }, nil) + s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/metadata/uploadId.part"), + }).Return(nil, &types.NoSuchKey{}) + + // For WriteChunk + s3obj.EXPECT().UploadPart(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/uploaded/files/uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int32(2), + Body: bytes.NewReader([]byte("world")), + })).Return(&s3.UploadPartOutput{ + ETag: aws.String("etag-2"), + }, nil) + + // For FinishUpload + s3obj.EXPECT().CompleteMultipartUpload(context.Background(), &s3.CompleteMultipartUploadInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/uploaded/files/uploadId"), + UploadId: aws.String("multipartId"), + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: []types.CompletedPart{ + { + ETag: aws.String("etag-1"), + PartNumber: aws.Int32(1), + }, + { + ETag: aws.String("etag-2"), + PartNumber: aws.Int32(2), + }, + }, + }, + }).Return(nil, nil) + + info1 := handler.FileInfo{ + ID: "uploadId", + Size: 11, + MetaData: map[string]string{}, + } + + // 1. Create upload + upload1, err := store.NewUpload(context.Background(), info1) + assert.Nil(err) + assert.NotNil(upload1) + + // 2. Write first chunk + bytesRead, err := upload1.WriteChunk(context.Background(), 0, bytes.NewReader([]byte("hello "))) + assert.Nil(err) + assert.Equal(int64(6), bytesRead) + + // 3. Fetch upload again + upload2, err := store.GetUpload(context.Background(), "uploadId") + assert.Nil(err) + assert.NotNil(upload2) + + // 4. Retrieve upload state + info2, err := upload2.GetInfo(context.Background()) + assert.Nil(err) + assert.Equal(int64(11), info2.Size) + assert.Equal(int64(6), info2.Offset) + assert.Equal("uploadId", info2.ID) + assert.Equal("my/uploaded/files/uploadId", info2.Storage["Key"]) + assert.Equal("multipartId", info2.Storage["MultipartUpload"]) + + // 5. Write second chunk + bytesRead, err = upload2.WriteChunk(context.Background(), 6, bytes.NewReader([]byte("world"))) + assert.Nil(err) + assert.Equal(int64(5), bytesRead) + + // 6. Complete upload + err = upload2.FinishUpload(context.Background()) + assert.Nil(err) +} + +// TestCustomKeyAndBucket asserts an entire upload flow when ObjectPrefix +// and MetadataObjectPrefix are set, including creating, resuming and finishing an upload. +func TestCustomKeyAndBucket(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + store.ObjectPrefix = "my/uploaded/files" + store.MinPartSize = 1 + + assert.Equal("bucket", store.Bucket) + assert.Equal(s3obj, store.Service) + + // For NewUpload + s3obj.EXPECT().CreateMultipartUpload(context.Background(), &s3.CreateMultipartUploadInput{ + Bucket: aws.String("custom-bucket"), + Key: aws.String("my/uploaded/files/custom/key"), + Metadata: map[string]string{}, + }).Return(&s3.CreateMultipartUploadOutput{ + UploadId: aws.String("multipartId"), + }, nil) + s3obj.EXPECT().PutObject(context.Background(), &s3.PutObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/uploaded/files/uploadId.info"), + Body: bytes.NewReader([]byte(`{"ID":"uploadId","Size":11,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"custom-bucket","Key":"my/uploaded/files/custom/key","MultipartUpload":"multipartId","Type":"s3store"}}`)), + ContentLength: aws.Int64(254), + }) + + // For WriteChunk + s3obj.EXPECT().UploadPart(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("custom-bucket"), + Key: aws.String("my/uploaded/files/custom/key"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int32(1), + Body: bytes.NewReader([]byte("hello ")), + })).Return(&s3.UploadPartOutput{ + ETag: aws.String("etag-1"), + }, nil) + + // For GetUpload + s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/uploaded/files/uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":11,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"custom-bucket","Key":"my/uploaded/files/custom/key","MultipartUpload":"multipartId","Type":"s3store"}}`))), + }, nil) + s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("custom-bucket"), + Key: aws.String("my/uploaded/files/custom/key"), + UploadId: aws.String("multipartId"), + PartNumberMarker: nil, + }).Return(&s3.ListPartsOutput{ + Parts: []types.Part{ + { + PartNumber: aws.Int32(1), + Size: aws.Int64(6), + ETag: aws.String("etag-1"), + }, + }, + IsTruncated: aws.Bool(false), + }, nil) + s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/uploaded/files/uploadId.part"), + }).Return(nil, &types.NoSuchKey{}) + + // For WriteChunk + s3obj.EXPECT().UploadPart(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("custom-bucket"), + Key: aws.String("my/uploaded/files/custom/key"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int32(2), + Body: bytes.NewReader([]byte("world")), + })).Return(&s3.UploadPartOutput{ + ETag: aws.String("etag-2"), + }, nil) + + // For FinishUpload + s3obj.EXPECT().CompleteMultipartUpload(context.Background(), &s3.CompleteMultipartUploadInput{ + Bucket: aws.String("custom-bucket"), + Key: aws.String("my/uploaded/files/custom/key"), + UploadId: aws.String("multipartId"), + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: []types.CompletedPart{ + { + ETag: aws.String("etag-1"), + PartNumber: aws.Int32(1), + }, + { + ETag: aws.String("etag-2"), + PartNumber: aws.Int32(2), + }, + }, + }, + }).Return(nil, nil) + + info1 := handler.FileInfo{ + ID: "uploadId", + Size: 11, + MetaData: map[string]string{}, + Storage: map[string]string{ + "Key": "custom/key", + "Bucket": "custom-bucket", + }, + } + + // 1. Create upload + upload1, err := store.NewUpload(context.Background(), info1) + assert.Nil(err) + assert.NotNil(upload1) + + // 2. Write first chunk + bytesRead, err := upload1.WriteChunk(context.Background(), 0, bytes.NewReader([]byte("hello "))) + assert.Nil(err) + assert.Equal(int64(6), bytesRead) + + // 3. Fetch upload again + upload2, err := store.GetUpload(context.Background(), "uploadId") + assert.Nil(err) + assert.NotNil(upload2) + + // 4. Retrieve upload state + info2, err := upload2.GetInfo(context.Background()) + assert.Nil(err) + assert.Equal(int64(11), info2.Size) + assert.Equal(int64(6), info2.Offset) + assert.Equal("uploadId", info2.ID) + assert.Equal("my/uploaded/files/custom/key", info2.Storage["Key"]) + assert.Equal("multipartId", info2.Storage["MultipartUpload"]) + + // 5. Write second chunk + bytesRead, err = upload2.WriteChunk(context.Background(), 6, bytes.NewReader([]byte("world"))) + assert.Nil(err) + assert.Equal(int64(5), bytesRead) + + // 6. Complete upload + err = upload2.FinishUpload(context.Background()) + assert.Nil(err) +}