@@ -130,32 +130,32 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
130
130
var complMultipartUpload completeMultipartUpload
131
131
132
132
// Declare a channel that sends the next part number to be uploaded.
133
- // Buffered to 10000 because thats the maximum number of parts allowed
134
- // by S3.
135
- uploadPartsCh := make (chan uploadPartReq , 10000 )
133
+ uploadPartsCh := make (chan uploadPartReq )
136
134
137
135
// Declare a channel that sends back the response of a part upload.
138
- // Buffered to 10000 because thats the maximum number of parts allowed
139
- // by S3.
140
- uploadedPartsCh := make (chan uploadedPartRes , 10000 )
136
+ uploadedPartsCh := make (chan uploadedPartRes )
141
137
142
138
// Used for readability, lastPartNumber is always totalPartsCount.
143
139
lastPartNumber := totalPartsCount
144
140
141
+ partitionCtx , partitionCancel := context .WithCancel (ctx )
142
+ defer partitionCancel ()
145
143
// Send each part number to the channel to be processed.
146
- for p := 1 ; p <= totalPartsCount ; p ++ {
147
- uploadPartsCh <- uploadPartReq {PartNum : p }
148
- }
149
- close (uploadPartsCh )
150
-
151
- partsBuf := make ([][]byte , opts .getNumThreads ())
152
- for i := range partsBuf {
153
- partsBuf [i ] = make ([]byte , 0 , partSize )
154
- }
144
+ go func () {
145
+ defer close (uploadPartsCh )
146
+
147
+ for p := 1 ; p <= totalPartsCount ; p ++ {
148
+ select {
149
+ case <- partitionCtx .Done ():
150
+ return
151
+ case uploadPartsCh <- uploadPartReq {PartNum : p }:
152
+ }
153
+ }
154
+ }()
155
155
156
156
// Receive each part number from the channel allowing three parallel uploads.
157
157
for w := 1 ; w <= opts .getNumThreads (); w ++ {
158
- go func (w int , partSize int64 ) {
158
+ go func (partSize int64 ) {
159
159
for {
160
160
var uploadReq uploadPartReq
161
161
var ok bool
@@ -181,21 +181,11 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
181
181
partSize = lastPartSize
182
182
}
183
183
184
- n , rerr := readFull (io .NewSectionReader (reader , readOffset , partSize ), partsBuf [w - 1 ][:partSize ])
185
- if rerr != nil && rerr != io .ErrUnexpectedEOF && rerr != io .EOF {
186
- uploadedPartsCh <- uploadedPartRes {
187
- Error : rerr ,
188
- }
189
- // Exit the goroutine.
190
- return
191
- }
192
-
193
- // Get a section reader on a particular offset.
194
- hookReader := newHook (bytes .NewReader (partsBuf [w - 1 ][:n ]), opts .Progress )
184
+ sectionReader := newHook (io .NewSectionReader (reader , readOffset , partSize ), opts .Progress )
195
185
196
186
// Proceed to upload the part.
197
187
objPart , err := c .uploadPart (ctx , bucketName , objectName ,
198
- uploadID , hookReader , uploadReq .PartNum ,
188
+ uploadID , sectionReader , uploadReq .PartNum ,
199
189
"" , "" , partSize ,
200
190
opts .ServerSideEncryption ,
201
191
! opts .DisableContentSha256 ,
@@ -218,7 +208,7 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
218
208
Part : uploadReq .Part ,
219
209
}
220
210
}
221
- }(w , partSize )
211
+ }(partSize )
222
212
}
223
213
224
214
// Gather the responses as they occur and update any
@@ -229,12 +219,12 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
229
219
return UploadInfo {}, ctx .Err ()
230
220
case uploadRes := <- uploadedPartsCh :
231
221
if uploadRes .Error != nil {
222
+
232
223
return UploadInfo {}, uploadRes .Error
233
224
}
234
225
235
226
// Update the totalUploadedSize.
236
227
totalUploadedSize += uploadRes .Size
237
- // Store the parts to be completed in order.
238
228
complMultipartUpload .Parts = append (complMultipartUpload .Parts , CompletePart {
239
229
ETag : uploadRes .Part .ETag ,
240
230
PartNumber : uploadRes .Part .PartNumber ,
0 commit comments