diff --git a/src/main/java/com/qiniu/http/Client.java b/src/main/java/com/qiniu/http/Client.java index ee47f3390..0fb39fa19 100755 --- a/src/main/java/com/qiniu/http/Client.java +++ b/src/main/java/com/qiniu/http/Client.java @@ -342,7 +342,7 @@ public void accept(String key, Object value) { IpTag tag = new IpTag(); try { res = httpClient.newCall(requestBuilder.tag(tag).build()).execute(); - } catch (IOException e) { + } catch (Exception e) { throw new QiniuException(e); } diff --git a/src/main/java/com/qiniu/storage/Configuration.java b/src/main/java/com/qiniu/storage/Configuration.java index 056c38d89..f52f8e9d7 100644 --- a/src/main/java/com/qiniu/storage/Configuration.java +++ b/src/main/java/com/qiniu/storage/Configuration.java @@ -18,12 +18,13 @@ public final class Configuration implements Cloneable { public static String defaultRsHost = "rs.qiniu.com"; public static String defaultApiHost = "api.qiniu.com"; public static String defaultUcHost = "uc.qbox.me"; + /** - * 使用的Region + * 使用的 Region 设置 region 请调用 setRegion */ public Region region; /** - * 使用的Zone + * 使用的 Zone 。推荐使用 Region ,设置 region 请调用 setRegion */ @Deprecated public Zone zone; @@ -107,6 +108,18 @@ public Configuration(Zone zone) { configHelper = new ConfigHelper(this); } + public void setRegion(Region region) { + this.region = region; + configHelper = new ConfigHelper(this); + } + + @Deprecated + public void setZone(Zone zone) { + this.region = null; + this.zone = zone; + configHelper = new ConfigHelper(this); + } + public Configuration clone() { try { return (Configuration) super.clone(); diff --git a/src/main/java/com/qiniu/storage/FixBlockUploader.java b/src/main/java/com/qiniu/storage/FixBlockUploader.java index f07f83e63..bceb4ffaa 100644 --- a/src/main/java/com/qiniu/storage/FixBlockUploader.java +++ b/src/main/java/com/qiniu/storage/FixBlockUploader.java @@ -5,6 +5,7 @@ import com.qiniu.http.Client; import com.qiniu.http.Response; import com.qiniu.util.*; +import com.qiniu.storage.MultipartUpload.EtagIdx; import java.io.File; import java.io.IOException; @@ -18,24 +19,36 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +/** + * 分片上传实现 + * https://developer.qiniu.com/kodo/api/6364/multipartupload-interface + * + * 若上传到同区域,如全上传到 华东存储,则可只使用一个实例; + * 若上传到不同区域,则每个区域最好单独使用一个示例。一个实例多区域并发上传可能导致内部频繁报错,上传效率低; + */ public class FixBlockUploader { private final int blockSize; - private final ConfigHelper configHelper; - private final Client client; private final Recorder recorder; + private final MultipartUpload uploader; + private final int retryMax; private String host = null; /** - * @param blockSize must be multiples of 4M. + * 若上传到同区域,如全上传到 华东存储,则可只使用一个实例; + * 若上传到不同区域,则每个区域最好单独使用一个示例。一个实例多区域并发上传可能导致内部频繁报错,上传效率低; + * + * @param blockSize block size, eg: 4 * 1024 * 1024 * @param configuration Nullable, if null, then create a new one. * @param client Nullable, if null, then create a new one with configuration. * @param recorder Nullable. */ public FixBlockUploader(int blockSize, Configuration configuration, Client client, Recorder recorder) { - assert blockSize > 0 && blockSize % (4 * 1024 * 1024) == 0 : "blockSize must be multiples of 4M "; + if (blockSize <= 0) { + blockSize = 4 * 1024 * 1024; + } if (configuration == null) { configuration = new Configuration(); @@ -43,29 +56,12 @@ public FixBlockUploader(int blockSize, Configuration configuration, Client clien if (client == null) { client = new Client(configuration); } - this.configHelper = new ConfigHelper(configuration); - this.client = client; + this.uploader = new MultipartUpload(configuration, client); this.blockSize = blockSize; this.recorder = recorder; this.retryMax = configuration.retryMax; } - static void sortAsc(List etags) { - Collections.sort(etags, new Comparator() { - @Override - public int compare(EtagIdx o1, EtagIdx o2) { - return o1.partNumber - o2.partNumber; // small enough and both greater than 0 // - } - }); - } - - static void sleepMillis(long millis) { - try { - Thread.sleep(millis); - } catch (InterruptedException e) { - // do nothing - } - } public Response upload(final File file, final String token, String key) throws QiniuException { return upload(file, token, key, null, null, 0); @@ -76,7 +72,7 @@ public Response upload(final File file, final String token, String key, return upload(file, token, key, null, pool, 8); } - public Response upload(final File file, final String token, String key, OptionsMeta params, + public Response upload(final File file, final String token, String key, MultipartUpload.OptionsMeta params, ExecutorService pool, int maxRunningBlock) throws QiniuException { BlockData blockData; try { @@ -87,56 +83,55 @@ public Response upload(final File file, final String token, String key, OptionsM return upload(blockData, new StaticToken(token), key, params, pool, maxRunningBlock); } + public Response upload(final InputStream is, long inputStreamLength, String fileName, final String token, String key) throws QiniuException { return upload(is, inputStreamLength, fileName, token, key, null, null, 0); } + public Response upload(final InputStream is, long inputStreamLength, String fileName, final String token, String key, ExecutorService pool) throws QiniuException { return upload(is, inputStreamLength, fileName, token, key, null, pool, 8); } + public Response upload(final InputStream is, long inputStreamLength, String fileName, - final String token, String key, OptionsMeta params, + final String token, String key, MultipartUpload.OptionsMeta params, ExecutorService pool, int maxRunningBlock) throws QiniuException { BlockData blockData; blockData = new InputStreamBlockData(this.blockSize, is, inputStreamLength, fileName); return upload(blockData, new StaticToken(token), key, params, pool, maxRunningBlock); } - Response upload(BlockData blockData, String token, String key, OptionsMeta params, + + Response upload(BlockData blockData, String token, String key, MultipartUpload.OptionsMeta params, ExecutorService pool, int maxRunningBlock) throws QiniuException { return upload(blockData, new StaticToken(token), key, params, pool, maxRunningBlock); } - Response upload(BlockData blockData, Token token, String key, OptionsMeta params, + + Response upload(BlockData blockData, Token token, String key, MultipartUpload.OptionsMeta params, ExecutorService pool, int maxRunningBlock) throws QiniuException { try { String bucket = parseBucket(token.getUpToken()); - /* - 上传到七牛存储保存的文件名, 需要进行UrlSafeBase64编码。 - 注意: - 当设置为空时表示空的文件名; - 当设置为未进行 UrlSafeBase64 编码的字符 ~ 的时候,表示未设置文件名, - 具体行为如分片上传v1: 使用文件的hash最为文件名, 如果设置了saveKey则使用saveKey的规则进行文件命名 - */ - String base64Key = key != null ? UrlSafeBase64.encodeToString(key) : "~"; String recordFileKey = (recorder == null) ? "NULL" - : recorder.recorderKeyGenerate(bucket, base64Key, blockData.getContentUUID(), + : recorder.recorderKeyGenerate(bucket, MultipartUpload.genKey(key), blockData.getContentUUID(), this.blockSize + "*:|>?^ \b" + this.getClass().getName()); // must before any http request // - if (host == null) { - host = configHelper.upHost(token.getUpToken()); - } + uploader.initUpHost(token.getUpToken()); + UploadRecordHelper recordHelper = new UploadRecordHelper(recorder, recordFileKey, blockData.repeatable()); - Record record = initUpload(blockData, recordHelper, bucket, base64Key, token); + // 1. initParts + Record record = initUpload(blockData, recordHelper, bucket, key, token); boolean repeatable = recorder != null && blockData.repeatable(); Response res; try { - upBlock(blockData, token, bucket, base64Key, repeatable, record, pool, maxRunningBlock); - res = makeFile(bucket, base64Key, token, record.uploadId, record.etagIdxes, + // 2. uploadPart + upBlock(blockData, token, bucket, key, repeatable, record, pool, maxRunningBlock); + // 3. completeParts + res = makeFile(bucket, key, token, record.uploadId, record.etagIdxes, blockData.getFileName(), params); } catch (QiniuException e) { // if everything is ok, do not need to sync record // @@ -153,7 +148,7 @@ Response upload(BlockData blockData, Token token, String key, OptionsMeta params } Record initUpload(BlockData blockData, UploadRecordHelper recordHelper, - String bucket, String base64Key, Token token) throws QiniuException { + String bucket, String key, Token token) throws QiniuException { Record record = null; if (blockData.repeatable()) { record = recordHelper.reloadRecord(); @@ -164,23 +159,19 @@ record = null; } if (record == null || record.uploadId == null) { - String uploadId = init(bucket, base64Key, token.getUpToken()); + MultipartUpload.InitRet ret = init(bucket, key, token); + List etagIdxes = new ArrayList<>(); - record = initRecord(uploadId, etagIdxes); + record = initRecord(ret, etagIdxes); } return record; } - String init(String bucket, String base64Key, String upToken) throws QiniuException { - String url = host + "/buckets/" + bucket + "/objects/" + base64Key + "/uploads"; - byte[] data = new byte[0]; - StringMap headers = new StringMap().put("Authorization", "UpToken " + upToken); - String contentType = ""; - + MultipartUpload.InitRet init(String bucket, String key, Token token) throws QiniuException { Response res = null; try { // 1 - res = client.post(url, data, headers, contentType); + res = uploader.initiateMultipartUpload(bucket, key, token.getUpToken()); } catch (QiniuException e) { if (res == null && e.response != null) { res = e.response; @@ -192,11 +183,11 @@ String init(String bucket, String base64Key, String upToken) throws QiniuExcepti // 重试一次,初始不计入重试次数 // if (res == null || res.needRetry()) { if (res == null || res.needSwitchServer()) { - changeHost(upToken, host); + uploader.changeHost(token.getUpToken(), host); } try { // 2 - res = client.post(url, data, headers, contentType); + res = uploader.initiateMultipartUpload(bucket, key, token.getUpToken()); } catch (QiniuException e) { if (res == null && e.response != null) { res = e.response; @@ -207,17 +198,17 @@ String init(String bucket, String base64Key, String upToken) throws QiniuExcepti if (res == null || res.needRetry()) { if (res == null || res.needSwitchServer()) { - changeHost(upToken, host); + uploader.changeHost(token.getUpToken(), host); } // 3 - res = client.post(url, data, headers, contentType); + res = uploader.initiateMultipartUpload(bucket, key, token.getUpToken()); } } try { - String uploadId = res.jsonToMap().get("uploadId").toString(); - if (uploadId.length() > 10) { - return uploadId; + MultipartUpload.InitRet ret = res.jsonToObject(MultipartUpload.InitRet.class); + if (ret != null && ret.uploadId != null && ret.uploadId.length() > 10 && ret.expireAt > 1000) { + return ret; } } catch (Exception e) { // ignore, see next line @@ -226,14 +217,15 @@ String init(String bucket, String base64Key, String upToken) throws QiniuExcepti throw new QiniuException(res); } - private void upBlock(BlockData blockData, Token token, String bucket, String base64Key, boolean repeatable, + + private void upBlock(BlockData blockData, Token token, String bucket, String key, boolean repeatable, Record record, ExecutorService pool, int maxRunningBlock) throws QiniuException { boolean useParallel = useParallel(pool, blockData, record); if (!useParallel) { - seqUpload(blockData, token, bucket, base64Key, record); + seqUpload(blockData, token, bucket, key, record); } else { - parallelUpload(blockData, token, bucket, base64Key, record, repeatable, pool, maxRunningBlock); + parallelUpload(blockData, token, bucket, key, record, repeatable, pool, maxRunningBlock); } } @@ -242,14 +234,14 @@ private boolean useParallel(ExecutorService pool, BlockData blockData, Record re } private void seqUpload(BlockData blockData, Token token, String bucket, - String base64Key, Record record) throws QiniuException { + String key, Record record) throws QiniuException { final String uploadId = record.uploadId; final List etagIdxes = record.etagIdxes; RetryCounter counter = new NormalRetryCounter(retryMax); while (blockData.hasNext()) { try { blockData.nextBlock(); - } catch (IOException e) { + } catch (Exception e) { throw new QiniuException(e, e.getMessage()); } DataWraper wrapper = blockData.getCurrentBlockData(); @@ -259,9 +251,9 @@ private void seqUpload(BlockData blockData, Token token, String bucket, EtagIdx etagIdx; try { - etagIdx = uploadBlock(bucket, base64Key, token, uploadId, + etagIdx = uploadBlock(bucket, key, token, uploadId, wrapper.getData(), wrapper.getSize(), wrapper.getIndex(), counter); - } catch (IOException e) { + } catch (Exception e) { throw new QiniuException(e, e.getMessage()); } etagIdxes.add(etagIdx); @@ -271,7 +263,7 @@ private void seqUpload(BlockData blockData, Token token, String bucket, } private void parallelUpload(BlockData blockData, final Token token, - final String bucket, final String base64Key, Record record, + final String bucket, final String key, Record record, boolean needRecord, ExecutorService pool, int maxRunningBlock) throws QiniuException { final String uploadId = record.uploadId; final List etagIdxes = record.etagIdxes; @@ -282,7 +274,7 @@ private void parallelUpload(BlockData blockData, final Token token, while (blockData.hasNext()) { try { blockData.nextBlock(); - } catch (IOException e) { + } catch (Exception e) { qiniuEx = new QiniuException(e, e.getMessage()); break; } @@ -294,7 +286,7 @@ private void parallelUpload(BlockData blockData, final Token token, Callable runner = new Callable() { @Override public EtagIdx call() throws Exception { - return uploadBlock(bucket, base64Key, token, uploadId, + return uploadBlock(bucket, key, token, uploadId, wrapper.getData(), wrapper.getSize(), wrapper.getIndex(), counter); } }; @@ -338,11 +330,12 @@ private boolean alreadyDone(int index, List etagIdxes) { } private void waitingEnough(int maxRunningBlock, List> futures) { - for (; ; ) { + while (true) { if (futures.size() < maxRunningBlock) { break; } int done = 0; + // max(len(futures)) = 10000 for (Future future : futures) { if (future.isDone()) { done++; @@ -351,15 +344,16 @@ private void waitingEnough(int maxRunningBlock, List> futures) { if (futures.size() - done < maxRunningBlock) { break; } - sleepMillis(500); + sleepMillis(100); } } - EtagIdx uploadBlock(String bucket, String base64Key, Token token, String uploadId, byte[] data, + EtagIdx uploadBlock(String bucket, String key, Token token, String uploadId, byte[] data, int dataLength, int partNum, RetryCounter counter) throws QiniuException { - Response res = uploadBlockWithRetry(bucket, base64Key, token, uploadId, data, dataLength, partNum, counter); + Response res = uploadBlockWithRetry(bucket, key, token, uploadId, data, dataLength, partNum, counter); try { - String etag = res.jsonToMap().get("etag").toString(); + MultipartUpload.UploadPartRet ret = res.jsonToObject(MultipartUpload.UploadPartRet.class); + String etag = ret.getEtag(); if (etag.length() > 10) { return new EtagIdx(etag, partNum, dataLength); } @@ -369,23 +363,18 @@ EtagIdx uploadBlock(String bucket, String base64Key, Token token, String uploadI throw new QiniuException(res); } - Response uploadBlockWithRetry(String bucket, String base64Key, Token token, String uploadId, - byte[] data, int dataLength, int partNum, RetryCounter counter) + Response uploadBlockWithRetry(String bucket, String key, Token token, String uploadId, byte[] data, + int dataLength, int partNum, RetryCounter counter) throws QiniuException { - String url = host + "/buckets/" + bucket + "/objects/" + base64Key + "/uploads/" + uploadId + "/" + partNum; - StringMap headers = new StringMap(). - put("Content-MD5", Md5.md5(data, 0, dataLength)). - put("Authorization", "UpToken " + token.getUpToken()); - // 在 最多重试次数 范围内, 每个块至多上传 3 次 // // 1 - Response res = uploadBlock1(url, data, dataLength, headers, true); + Response res = uploadBlock1(bucket, key, token, uploadId, data, dataLength, partNum, true); if (res.isOK()) { return res; } if (res.needSwitchServer()) { - changeHost(token.getUpToken(), host); + uploader.changeHost(token.getUpToken(), host); } if (!counter.inRange()) { @@ -395,14 +384,14 @@ Response uploadBlockWithRetry(String bucket, String base64Key, Token token, Stri if (res.needRetry()) { counter.retried(); // 2 - res = uploadBlock1(url, data, dataLength, headers, true); + res = uploadBlock1(bucket, key, token, uploadId, data, dataLength, partNum, true); if (res.isOK()) { return res; } if (res.needSwitchServer()) { - changeHost(token.getUpToken(), host); + uploader.changeHost(token.getUpToken(), host); } if (!counter.inRange()) { @@ -412,19 +401,18 @@ Response uploadBlockWithRetry(String bucket, String base64Key, Token token, Stri if (res.needRetry()) { counter.retried(); // 3 - res = uploadBlock1(url, data, dataLength, headers, false); + res = uploadBlock1(bucket, key, token, uploadId, data, dataLength, partNum, false); } } return res; } - Response uploadBlock1(String url, byte[] data, - int dataLength, StringMap headers, boolean ignoreError) throws QiniuException { + Response uploadBlock1(String bucket, String key, Token token, String uploadId, byte[] data, + int dataLength, int partNum, boolean ignoreError) throws QiniuException { // put PUT try { - Response res = client.put(url, data, 0, dataLength, headers, "application/octet-stream"); - return res; + return uploader.uploadPart(bucket, key, token.getUpToken(), uploadId, data, 0, dataLength, partNum); } catch (QiniuException e) { if (ignoreError) { if (e.response != null) { @@ -437,26 +425,20 @@ Response uploadBlock1(String url, byte[] data, } } - Response makeFile(String bucket, String base64Key, Token token, String uploadId, List etags, - String fileName, OptionsMeta params) throws QiniuException { - String url = host + "/buckets/" + bucket + "/objects/" + base64Key + "/uploads/" + uploadId; - final StringMap headers = new StringMap().put("Authorization", "UpToken " + token.getUpToken()); - sortAsc(etags); - byte[] data = new MakefileBody(etags, fileName, params) - .json().getBytes(Charset.forName("UTF-8")); - + Response makeFile(String bucket, String key, Token token, String uploadId, List etags, + String fileName, MultipartUpload.OptionsMeta params) throws QiniuException { // 1 - Response res = makeFile1(url, data, headers, true); + Response res = makeFile1(bucket, key, token, uploadId, etags, fileName, params, true); if (res.needRetry()) { // 2 - res = makeFile1(url, data, headers, true); + res = makeFile1(bucket, key, token, uploadId, etags, fileName, params, true); } if (res.needRetry()) { if (res.needSwitchServer()) { - changeHost(token.getUpToken(), host); + uploader.changeHost(token.getUpToken(), host); } // 3 - res = makeFile1(url, data, headers, false); + res = makeFile1(bucket, key, token, uploadId, etags, fileName, params, false); } // keep the same, with com.qiniu.http.Client#L337 if (res.statusCode >= 300) { @@ -465,10 +447,10 @@ Response makeFile(String bucket, String base64Key, Token token, String uploadId, return res; } - Response makeFile1(String url, byte[] data, StringMap headers, boolean ignoreError) throws QiniuException { + Response makeFile1(String bucket, String key, Token token, String uploadId, List etags, + String fileName, MultipartUpload.OptionsMeta params, boolean ignoreError) throws QiniuException { try { - Response res = client.post(url, data, headers, "application/json"); - return res; + return uploader.completeMultipartUpload(bucket, key, token.getUpToken(), uploadId, etags, fileName, params); } catch (QiniuException e) { if (ignoreError) { if (e.response != null) { @@ -481,15 +463,6 @@ Response makeFile1(String url, byte[] data, StringMap headers, boolean ignoreErr } } - private void changeHost(String upToken, String host) { - try { - this.host = configHelper.tryChangeUpHost(upToken, host); - } catch (Exception e) { - // ignore - // use the old up host // - } - } - private String parseBucket(String upToken) throws QiniuException { try { String part3 = upToken.split(":")[2]; @@ -502,38 +475,96 @@ private String parseBucket(String upToken) throws QiniuException { } } - Record initRecord(String uploadId, List etagIdxes) { - Record record = new Record(); - record.createdTime = System.currentTimeMillis(); - record.uploadId = uploadId; - record.size = 0; - record.etagIdxes = etagIdxes != null ? etagIdxes : new ArrayList(); - return record; - } /////////////////////////////////////// - interface DataWraper { - byte[] getData() throws IOException; + class Record { + // second + long expireAt; + String uploadId; + long size; + List etagIdxes; + } - int getSize(); - int getIndex(); + Record initRecord(MultipartUpload.InitRet ret, List etagIdxes) { + Record record = new Record(); + record.uploadId = ret.uploadId; + //// 服务端 7 天内有效,设置 5 天 //// + record.expireAt = ret.expireAt - 3600 * 24 * 2; + record.etagIdxes = etagIdxes != null ? etagIdxes : new ArrayList(); + for (EtagIdx l : record.etagIdxes) { + record.size += l.size; + } + + return record; } - interface Token { - String getUpToken(); - } + class UploadRecordHelper { + boolean needRecord; + Recorder recorder; + String recordFileKey; + UploadRecordHelper(Recorder recorder, String recordFileKey, boolean needRecord) { + this.needRecord = needRecord; + if (recorder != null) { + this.recorder = recorder; + this.recordFileKey = recordFileKey; + } + } - interface RetryCounter { - void retried(); + public Record reloadRecord() { + Record record = null; + if (recorder != null) { + try { + byte[] data = recorder.get(recordFileKey); + record = new Gson().fromJson(new String(data, Charset.forName("UTF-8")), Record.class); + } catch (Exception e) { + // do nothing + } + } + return record; + } - boolean inRange(); + public void delRecord() { + if (recorder != null) { + recorder.del(recordFileKey); + } + } + + + public void syncRecord(Record record) { + if (needRecord && recorder != null && record.etagIdxes.size() > 0) { + MultipartUpload.sortByPartNumberAsc(record.etagIdxes); + recorder.set(recordFileKey, new Gson().toJson(record).getBytes(Charset.forName("UTF-8"))); + } + } + + public boolean isActiveRecord(Record record, BlockData blockData) { + boolean isOk = record != null + && record.expireAt < (System.currentTimeMillis() / 1000) + && !StringUtils.isNullOrEmpty(record.uploadId) + && record.etagIdxes != null && record.etagIdxes.size() > 0 + && record.size > 0 && record.size <= blockData.size(); + if (isOk) { + int p = 0; + // PartNumber start with 1 and increase by 1 // + // 并行上传,中间块可能缺失(上传失败) // + for (EtagIdx ei : record.etagIdxes) { + if (ei.partNumber > p) { + p = ei.partNumber; + } else { + return false; + } + } + } + + return isOk; + } } @@ -563,6 +594,61 @@ abstract static class BlockData { abstract String getFileName(); } + interface DataWraper { + byte[] getData() throws IOException; + + int getSize(); + + int getIndex(); + } + + interface Token { + String getUpToken(); + } + + interface RetryCounter { + void retried(); + + boolean inRange(); + } + + class NormalRetryCounter implements RetryCounter { + int count; + + NormalRetryCounter(int max) { + this.count = max; + } + + @Override + public void retried() { + this.count--; + } + + @Override + public boolean inRange() { + return this.count > 0; + } + } + + class AsyncRetryCounter implements RetryCounter { + volatile int count; + + AsyncRetryCounter(int max) { + this.count = max; + } + + @Override + public synchronized void retried() { + this.count--; + } + + @Override + public synchronized boolean inRange() { + return this.count > 0; + } + } + + static class FileBlockData extends BlockData { final long totalLength; String contentUUID; @@ -656,6 +742,7 @@ public String getFileName() { } } + static class InputStreamBlockData extends BlockData { final long totalLength; final boolean closedAfterUpload; @@ -772,6 +859,16 @@ public String getFileName() { } } + + static void sleepMillis(long millis) { + // LockSupport.parkNanos(millis * 1000 * 1000); // or + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + // do nothing + } + } + static class StaticToken implements Token { String token; @@ -785,204 +882,4 @@ public String getUpToken() { } } - public static class OptionsMeta { - String mimeType; - StringMap metadata; - StringMap customVars; - - public OptionsMeta setMimeType(String mimeType) { - this.mimeType = mimeType; - return this; - } - - /** - * @param key start with X-Qn-Meta- - * @param value not null or empty - */ - public OptionsMeta addMetadata(String key, String value) { - if (metadata == null) { - metadata = new StringMap(); - } - metadata.put(key, value); - return this; - } - - /** - * @param key start with x: - * @param value not null or empty - */ - public OptionsMeta addCustomVar(String key, String value) { - if (customVars == null) { - customVars = new StringMap(); - } - customVars.put(key, value); - return this; - } - } - - class MakefileBody { - List parts; - String fname; - String mimeType; - Map metadata; - Map customVars; - - MakefileBody(List etags, String fileName, OptionsMeta params) { - this.parts = etags; - this.fname = fileName; - if (params != null) { - this.mimeType = params.mimeType; - if (params.metadata != null && params.metadata.size() > 0) { - this.metadata = filterParam(params.metadata, "X-Qn-Meta-"); - } - if (params.customVars != null && params.customVars.size() > 0) { - this.customVars = filterParam(params.customVars, "x:"); - } - } - } - - private Map filterParam(StringMap param, final String keyPrefix) { - final Map ret = new HashMap<>(); - final String prefix = keyPrefix.toLowerCase(); - param.forEach(new StringMap.Consumer() { - @Override - public void accept(String key, Object value) { - if (key != null && value != null && !StringUtils.isNullOrEmpty(value.toString()) - && key.toLowerCase().startsWith(prefix)) { - ret.put(key, value); - } - } - }); - return ret; - } - - public String json() { - return new Gson().toJson(this); - } - } - - class EtagIdx { - String etag; - int partNumber; - transient int size; - - EtagIdx(String etag, int idx, int size) { - this.etag = etag; - this.partNumber = idx; - this.size = size; - } - - public String toString() { - return new Gson().toJson(this); - } - - } - - class Record { - long createdTime; - String uploadId; - long size; - List etagIdxes; - } - - class UploadRecordHelper { - boolean needRecord; - Recorder recorder; - String recordFileKey; - - UploadRecordHelper(Recorder recorder, String recordFileKey, boolean needRecord) { - this.needRecord = needRecord; - if (recorder != null) { - this.recorder = recorder; - this.recordFileKey = recordFileKey; - } - } - - public Record reloadRecord() { - Record record = null; - if (recorder != null) { - try { - byte[] data = recorder.get(recordFileKey); - record = new Gson().fromJson(new String(data, Charset.forName("UTF-8")), Record.class); - } catch (Exception e) { - // do nothing - } - } - return record; - } - - public void delRecord() { - if (recorder != null) { - recorder.del(recordFileKey); - } - } - - - public void syncRecord(Record record) { - if (needRecord && recorder != null && record.etagIdxes.size() > 0) { - sortAsc(record.etagIdxes); - recorder.set(recordFileKey, new Gson().toJson(record).getBytes(Charset.forName("UTF-8"))); - } - } - - public boolean isActiveRecord(Record record, BlockData blockData) { - //// 服务端 7 天内有效,设置 5 天 //// - boolean isOk = record != null - && record.createdTime > System.currentTimeMillis() - 1000 * 3600 * 24 * 5 - && !StringUtils.isNullOrEmpty(record.uploadId) - && record.etagIdxes != null && record.etagIdxes.size() > 0 - && record.size > 0 && record.size <= blockData.size(); - if (isOk) { - int p = 0; - // PartNumber start with 1 and increase by 1 // - // 当前文件各块串行 if (ei.idx == p + 1) . 若并行,需额外考虑 // - for (EtagIdx ei : record.etagIdxes) { - if (ei.partNumber > p) { - p = ei.partNumber; - } else { - return false; - } - } - } - - return isOk; - } - } - - class NormalRetryCounter implements RetryCounter { - int count; - - NormalRetryCounter(int max) { - this.count = max; - } - - @Override - public void retried() { - this.count--; - } - - @Override - public boolean inRange() { - return this.count > 0; - } - } - - class AsyncRetryCounter implements RetryCounter { - volatile int count; - - AsyncRetryCounter(int max) { - this.count = max; - } - - @Override - public synchronized void retried() { - this.count--; - } - - @Override - public synchronized boolean inRange() { - return this.count > 0; - } - } - } diff --git a/src/main/java/com/qiniu/storage/MultipartUpload.java b/src/main/java/com/qiniu/storage/MultipartUpload.java new file mode 100644 index 000000000..3f935a8f4 --- /dev/null +++ b/src/main/java/com/qiniu/storage/MultipartUpload.java @@ -0,0 +1,365 @@ +package com.qiniu.storage; + +import com.google.gson.ExclusionStrategy; +import com.google.gson.FieldAttributes; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.qiniu.common.QiniuException; +import com.qiniu.http.Client; +import com.qiniu.http.Response; +import com.qiniu.util.Md5; +import com.qiniu.util.StringMap; +import com.qiniu.util.StringUtils; +import com.qiniu.util.UrlSafeBase64; + +import java.nio.charset.Charset; +import java.util.*; + +/** + * 分片上传基础接口实现,以及部分辅助代码 + * 接口参考 https://developer.qiniu.com/kodo/api/6364/multipartupload-interface + *

+ * 若上传到同区域,如全上传到 华东存储,则可只使用一个实例; + * 若上传到不同区域,则每个区域最好单独使用一个示例。一个实例多区域并发上传可能导致内部频繁报错,上传效率低; + */ +public class MultipartUpload { + private ConfigHelper configHelper; + private Client client; + private String host; + + /** + * 若上传到同区域,如全上传到 华东存储,则可只使用一个实例; + * 若上传到不同区域,则每个区域最好单独使用一个示例。一个实例多区域并发上传可能导致内部频繁报错,上传效率低; + * + * @param configuration Nullable, if null, then create a new one. + * @param client Nullable, if null, then create a new one with configuration. + */ + public MultipartUpload(Configuration configuration, Client client) { + if (configuration == null) { + configuration = new Configuration(); + } + if (client == null) { + client = new Client(configuration); + } + this.configHelper = new ConfigHelper(configuration); + this.client = client; + } + + public void initUpHost(String uploadToken) throws QiniuException { + if (host == null) { + host = configHelper.upHost(uploadToken); + } + } + + public void changeHost(String upToken, String host) { + try { + this.host = configHelper.tryChangeUpHost(upToken, host); + } catch (Exception e) { + // ignore + // use the old up host + } + } + + public Response initiateMultipartUpload(String bucket, String key, String upToken) throws QiniuException { + initUpHost(upToken); + + String url = host + "/buckets/" + bucket + "/objects/" + genKey(key) + "/uploads"; + byte[] data = new byte[0]; + StringMap headers = new StringMap().put("Authorization", "UpToken " + upToken); + String contentType = ""; + return client.post(url, data, headers, contentType); + } + + public Response uploadPart(String bucket, String key, String upToken, String uploadId, byte[] data, + int dataOff, int dataLength, int partNum) throws QiniuException { + String url = host + "/buckets/" + bucket + "/objects/" + genKey(key) + "/uploads/" + uploadId + "/" + partNum; + String md5 = Md5.md5(data, dataOff, dataLength); + StringMap headers = new StringMap(). + put("Content-MD5", md5). + put("Authorization", "UpToken " + upToken); + return client.put(url, data, dataOff, dataLength, headers, "application/octet-stream"); + } + + public Response uploadPart(String bucket, String key, String upToken, String uploadId, byte[] data, + int partNum) throws QiniuException { + return uploadPart(bucket, key, upToken, uploadId, data, 0, data.length, partNum); + } + + public Response listParts(String bucket, String key, String upToken, String uploadId, int maxParts, String partNumberMarker) throws QiniuException { + String url = host + "/buckets/" + bucket + "/objects/" + genKey(key) + "/uploads/" + uploadId + "?max-parts=" + maxParts; + if (!StringUtils.isNullOrEmpty(partNumberMarker)) { + url += "&part-number-marker=" + partNumberMarker; + } + + final StringMap headers = new StringMap().put("Authorization", "UpToken " + upToken); + return client.get(url, headers); + } + + public List listPartsAll(String bucket, String key, String upToken, String uploadId) throws QiniuException { + return listPartsAll(bucket, key, upToken, uploadId, 1000); //默认值:1,000 ,最大值:1,000 + } + + public List listPartsAll(String bucket, String key, String upToken, String uploadId, int maxParts) throws QiniuException { + List parts = new ArrayList(); + String partNumberMarker = null; + while (true) { + Response res = listParts(bucket, key, upToken, uploadId, maxParts, partNumberMarker); + ListPartsRet ret = res.jsonToObject(ListPartsRet.class); + parts.addAll(ret.getParts()); + if ("0".equalsIgnoreCase(ret.getPartNumberMarker())) { + break; + } + partNumberMarker = ret.getPartNumberMarker(); + } + sortByPartNumberAsc(parts); + return parts; + } + + + public Response completeMultipartUpload(String bucket, String key, String upToken, + String uploadId, List etags, + String fileName, OptionsMeta params) throws QiniuException { + String url = host + "/buckets/" + bucket + "/objects/" + genKey(key) + "/uploads/" + uploadId; + final StringMap headers = new StringMap().put("Authorization", "UpToken " + upToken); + sortByPartNumberAsc(etags); + byte[] data = new MakefileBody(etags, fileName, params) + .json().getBytes(Charset.forName("UTF-8")); + return client.post(url, data, headers, "application/json"); + } + + public Response abortMultipartUpload(String bucket, String key, String upToken, + String uploadId) throws QiniuException { + String url = host + "/buckets/" + bucket + "/objects/" + genKey(key) + "/uploads/" + uploadId; + StringMap headers = new StringMap().put("Authorization", "UpToken " + upToken); + return client.delete(url, headers); + } + + /** + * @param key 指定的上传文件名。 null 对象是表示系统自动生成文件名(key);其它表示使用指定的值为文件名(key) + *

+ * 上传到七牛存储保存的文件名, http 上传时需要对其进行编码,此方法根据要求生成对应的编码。 + * 注意: + * 当 key 为空 "" 时表示空的文件名,正常进行 url_safe_base64 编码 ; + * 当 key 为未进行 UrlSafeBase64 编码的字符 ~ 的时候,表示未设置文件名:对应参数为 null 对象; + * 具体行为如分片上传v1: 使用文件的 hash 作为文件名, 如果设置了saveKey则使用saveKey的规则进行文件命名 + */ + static String genKey(String key) { + String base64Key = key != null ? UrlSafeBase64.encodeToString(key) : "~"; + return base64Key; + } + + /** + * 按照 partNumber 升序排序 + */ + public static void sortByPartNumberAsc(List etags) { + Collections.sort(etags, new Comparator() { + @Override + public int compare(EtagIdx o1, EtagIdx o2) { + return o1.partNumber - o2.partNumber; // small enough and both greater than 0 // + } + }); + } + + public static class InitRet { + long expireAt; + String uploadId; + + public InitRet(long expireAt, String uploadId) { + this.expireAt = expireAt; + this.uploadId = uploadId; + } + + public long getExpireAt() { + return expireAt; + } + + public String getUploadId() { + return uploadId; + } + } + + public static class UploadPartRet { + String etag; + String md5; + + public UploadPartRet(String etag, String md5) { + this.etag = etag; + this.md5 = md5; + } + + public String getEtag() { + return etag; + } + + public String getMd5() { + return md5; + } + } + + public static class EtagIdx { + String etag; // mkfile need + int partNumber; // mkfile need + int size; + long putTime; + + public EtagIdx(String etag, int idx, int size) { + this.etag = etag; + this.partNumber = idx; + this.size = size; + } + + public EtagIdx(String etag, int idx, int size, long putTime) { + this.etag = etag; + this.partNumber = idx; + this.size = size; + this.putTime = putTime; + } + + public String getEtag() { + return etag; + } + + public int getPartNumber() { + return partNumber; + } + + public int getSize() { + return size; + } + + public long getPutTime() { + return putTime; + } + + public String toString() { + return new Gson().toJson(this); + } + } + + public static class ListPartsRet { + String uploadId; + long expireAt; + String partNumberMarker; + List parts; + + public ListPartsRet(String uploadId, long expireAt, String partNumberMarker, List parts) { + this.uploadId = uploadId; + this.expireAt = expireAt; + this.partNumberMarker = partNumberMarker; + this.parts = parts; + } + + public String getUploadId() { + return uploadId; + } + + public long getExpireAt() { + return expireAt; + } + + public String getPartNumberMarker() { + return partNumberMarker; + } + + public List getParts() { + return parts; + } + } + + public static class OptionsMeta { + String mimeType; + StringMap metadata; + StringMap customVars; + + public OptionsMeta setMimeType(String mimeType) { + this.mimeType = mimeType; + return this; + } + + /** + * @param key start with X-Qn-Meta- + * @param value not null or empty + */ + public OptionsMeta addMetadata(String key, String value) { + if (metadata == null) { + metadata = new StringMap(); + } + metadata.put(key, value); + return this; + } + + /** + * @param key start with x: + * @param value not null or empty + */ + public OptionsMeta addCustomVar(String key, String value) { + if (customVars == null) { + customVars = new StringMap(); + } + customVars.put(key, value); + return this; + } + } + + + class MakefileBody { + List parts; + String fname; + String mimeType; + Map metadata; + Map customVars; + + MakefileBody(List etags, String fileName, OptionsMeta params) { + this.parts = etags; + this.fname = fileName; + if (params != null) { + this.mimeType = params.mimeType; + if (params.metadata != null && params.metadata.size() > 0) { + this.metadata = filterParam(params.metadata, "X-Qn-Meta-"); + } + if (params.customVars != null && params.customVars.size() > 0) { + this.customVars = filterParam(params.customVars, "x:"); + } + } + } + + private Map filterParam(StringMap param, final String keyPrefix) { + final Map ret = new HashMap<>(); + final String prefix = keyPrefix.toLowerCase(); + param.forEach(new StringMap.Consumer() { + @Override + public void accept(String key, Object value) { + if (key != null && value != null && !StringUtils.isNullOrEmpty(value.toString()) + && key.toLowerCase().startsWith(prefix)) { + ret.put(key, value); + } + } + }); + return ret; + } + + public String json() { + GsonBuilder builder = new GsonBuilder(); + Gson gson = builder.setExclusionStrategies(new CompleteMultipartUploadExclusionStrategy()).create(); + return gson.toJson(this); + } + } + + static class CompleteMultipartUploadExclusionStrategy implements ExclusionStrategy { + + /** + * 只需要 etag partNumber ,不需要 size、putTime + */ + @Override + public boolean shouldSkipField(FieldAttributes f) { + return EtagIdx.class == f.getDeclaringClass() && + ("size".equals(f.getName()) || "putTime".equals(f.getName())); + } + + @Override + public boolean shouldSkipClass(Class clazz) { + return false; + } + } + +} diff --git a/src/main/java/com/qiniu/util/EtagV2.java b/src/main/java/com/qiniu/util/EtagV2.java new file mode 100644 index 000000000..10c1246ac --- /dev/null +++ b/src/main/java/com/qiniu/util/EtagV2.java @@ -0,0 +1,136 @@ +package com.qiniu.util; + +import java.io.*; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; + +public class EtagV2 { + private EtagV2() { + } + + public static String data(byte[] data, int offset, int length, long blockSize) { + try { + return stream(new ByteArrayInputStream(data, offset, length), length, blockSize); + } catch (IOException e) { + throw new AssertionError(e); + } + } + + public static String data(byte[] data, long blockSize) { + return data(data, 0, data.length, blockSize); + } + + public static String data(byte[] data, int offset, int length, long[] parts) { + try { + return stream(new ByteArrayInputStream(data, offset, length), length, parts); + } catch (IOException e) { + throw new AssertionError(e); + } + } + + public static String data(byte[] data, long[] parts) { + return data(data, 0, data.length, parts); + } + + public static String file(String filePath, long blockSize) throws IOException { + return file(new File(filePath), blockSize); + } + + public static String file(String filePath, long[] parts) throws IOException { + return file(new File(filePath), parts); + } + + public static String file(File file, long blockSize) throws IOException { + FileInputStream fi = null; + try { + fi = new FileInputStream(file); + return stream(fi, file.length(), blockSize); + } finally { + if (fi != null) { + try { + fi.close(); + } catch (Throwable t) { + } + } + } + } + + public static String file(File file, long[] parts) throws IOException { + FileInputStream fi = null; + try { + fi = new FileInputStream(file); + return stream(fi, file.length(), parts); + } finally { + if (fi != null) { + try { + fi.close(); + } catch (Throwable t) { + } + } + } + } + + + public static String stream(InputStream in, long len, long blockSize) throws IOException { + if (blockSize == 1024 * 1024 * 4 || (len <= blockSize && len <= 1024 * 1024 * 4)) { + return Etag.stream(in, len); + } + int size = (int) ((len + blockSize - 1) / blockSize); + long[] parts = new long[size]; + Arrays.fill(parts, 0, size - 1, blockSize); + parts[size - 1] = len % blockSize; + + // is4MBParts(parts) include (len <= blockSize && len <= 1024 * 1024 * 4) + // means only one block, and it's size <= 4M, + // no need check + // if (is4MBParts(parts)) {return Etag.stream(in, len);} + + return etagV2NoCheck(in, len, parts); + } + + public static String stream(InputStream in, long len, long[] parts) throws IOException { + if (is4MBParts(parts)) { + return Etag.stream(in, len); + } + long partSize = 0; + for (long part : parts) { + partSize += part; + } + if (len != partSize) { + throw new IOException("etag calc failed: size not equal with part size"); + } + return etagV2NoCheck(in, len, parts); + } + + private static boolean is4MBParts(long[] parts) { + int idx = 0; + int last = parts.length - 1; + for (long part : parts) { + if (idx != last && part != 1024 * 1024 * 4 || part > 1024 * 1024 * 4) { + return false; + } + idx += 1; + } + return true; + } + + private static String etagV2NoCheck(InputStream in, long len, long[] parts) throws IOException { + MessageDigest sha1; + try { + sha1 = MessageDigest.getInstance("sha-1"); + } catch (NoSuchAlgorithmException e) { + throw new AssertionError(e); + } + for (long part : parts) { + String partEtag = Etag.stream(in, part); + byte[] bytes = UrlSafeBase64.decode(partEtag); + sha1.update(bytes, 1, bytes.length - 1); + } + byte[] digest = sha1.digest(); + byte[] ret = new byte[digest.length + 1]; + ret[0] = (byte) 0x9e; + System.arraycopy(digest, 0, ret, 1, digest.length); + return UrlSafeBase64.encodeToString(ret); + } +} diff --git a/src/test/java/com/qiniu/storage/FixBlockUploaderWithRecorderTest.java b/src/test/java/com/qiniu/storage/FixBlockUploaderWithRecorderTest.java index 35dbdda18..4a0bb6caa 100644 --- a/src/test/java/com/qiniu/storage/FixBlockUploaderWithRecorderTest.java +++ b/src/test/java/com/qiniu/storage/FixBlockUploaderWithRecorderTest.java @@ -5,7 +5,7 @@ import com.qiniu.http.Client; import com.qiniu.http.Response; import com.qiniu.storage.persistent.FileRecorder; -import com.qiniu.util.Etag; +import com.qiniu.util.EtagV2; import com.qiniu.util.StringMap; import com.qiniu.util.UrlSafeBase64; import org.junit.Before; @@ -22,16 +22,17 @@ import static org.junit.Assert.assertEquals; public class FixBlockUploaderWithRecorderTest { - int blockSize = 1024 * 1024 * 8; + int blockSize = 1024 * 1025 * 4; Configuration config; Client client; FixBlockUploader up; String bucket; Recorder recorder; + BucketManager bm; @Before public void init() { - init2(false); + init2(true); } private void init2(boolean useHttpsDomains) { @@ -47,6 +48,7 @@ private void init2(boolean useHttpsDomains) { e.printStackTrace(); } up = new FixBlockUploader(blockSize, config, client, recorder); + bm = new BucketManager(TestConfig.testAuth, config); bucket = TestConfig.testBucket_as0; } @@ -57,10 +59,10 @@ public void breakThenUpload1() throws IOException { @Test public void breakThenUpload2() throws IOException { - ExecutorService pool = new ThreadPoolExecutor(0, 2, + ExecutorService pool = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue()); - breakThenUpload(pool, Executors.newFixedThreadPool(2), Executors.newCachedThreadPool(), 10, 10, 2); + breakThenUpload(pool, Executors.newFixedThreadPool(2), Executors.newCachedThreadPool(), 10, 10, 7); } @@ -96,7 +98,8 @@ public void breakThenUpload(final ExecutorService pool1, final ExecutorService p final File f = TempFile.createFileOld(size); final FixBlockUploader.FileBlockData fbd = new FixBlockUploader.FileBlockData(blockSize, f); System.out.println(f.getAbsolutePath()); - final String etag = Etag.file(f); + final String etag = EtagV2.file(f, blockSize); + System.out.println("EtagV2: " + etag); final String returnBody = "{\"key\":\"$(key)\",\"hash\":\"$(etag)\",\"fsize\":\"$(fsize)\"" + ",\"fname\":\"$(fname)\",\"mimeType\":\"$(mimeType)\"}"; @@ -105,6 +108,12 @@ public void breakThenUpload(final ExecutorService pool1, final ExecutorService p final String token = TestConfig.testAuth.uploadToken(bucket, expectKey, 3600, p); final int[] t1Finished = {0}; + try { + bm.delete(bucket, expectKey); + } catch (Exception e) { + // do nothing + } + Thread t1 = new Thread() { @Override public void run() { @@ -124,9 +133,10 @@ public void run() { // 显示断点记录文件 Thread showRecord = new Thread() { public void run() { - for (; ; ) { + int t = 60 * 10; + for (int i=0; i < t ;i++ ) { doSleep(1000); - showRecord("normal: " + size + " :", recorder, recordKey); + showRecord( i + " normal: " + size + " :", recorder, recordKey); } } }; @@ -230,6 +240,11 @@ public void run() { throw e; } finally { TempFile.remove(f); + try { + bm.delete(bucket, expectKey); + } catch (Exception e) { + // do nothing + } } } diff --git a/src/test/java/test/com/qiniu/CdnTest.java b/src/test/java/test/com/qiniu/CdnTest.java index 019fd1d40..69f412bfc 100644 --- a/src/test/java/test/com/qiniu/CdnTest.java +++ b/src/test/java/test/com/qiniu/CdnTest.java @@ -181,27 +181,30 @@ public void testCreateTimestampAntiLeechUrlSimple() { StringMap queryStringMap = new StringMap(); queryStringMap.put("qiniu", "七牛"); queryStringMap.put("test", "Test"); - String encryptKey1 = "908b9cbbbc88028b50b8e8a88baa879bf1b8a788"; - String encryptKey2 = "d799eba9ff99ea88cfb8acbbf8b82898208afbb8"; - long deadline1 = System.currentTimeMillis() / 1000 + 3600; - long deadline2 = deadline1; - long deadline3 = 1485893946; // 2017-02-01 04:19:06 +0800 CST - String testUrl_z0_timeStamp_outdate = "http://javasdk-timestamp.peterpy.cn/do_not_delete/1.png?sign=14f48f829b78d5c9a34eb77e9a13f1b6&t=5890f13a"; + + String encryptKey1 = "10992a8a688900b89ab9f58a6899cb8bb1b924ab"; + String encryptKey2 = "64b89c989cb97cbb6a9b6c9a4ca93498b69974ab"; + long deadline1 = 1586690149; + long deadline3 = 1551966091; // 2019-03-07 21:41:31 +0800 CST + String u1 = "http://javasdk-timestamp.peterpy.cn/do_not_delete/1.png?test=Test&qiniu=%E4%B8%83%E7%89%9B&sign=f909641eb0539561dd8df28c15fa314b&t=5e92f865"; + String u2 = "http://javasdk-timestamp.peterpy.cn/do_not_delete/1.png?sign=9155cc1725106509920f5644f26f49b4&t=5e92f865"; + String u3 = "http://javasdk-timestamp.peterpy.cn/do_not_delete/1.png?sign=50d05540eea4ea8ab905b57006edef7a&t=5c811f8b"; + + // Fri May 16 18:22:38 2025 + String urlok = "http://javasdk-timestamp.peterpy.cn/do_not_delete/1.png?sign=fb528741bf617d33a0011e716e72e69a&t=682711ee"; + try { URL url = new URL(TestConfig.testUrl_z0_timeStamp); - Assert.assertEquals(msg, 403, getResponse(url.toString()).statusCode); String signedUrl1 = CdnManager.createTimestampAntiLeechUrl(host, fileName, queryStringMap, encryptKey1, deadline1); - String signedUrl2 = CdnManager.createTimestampAntiLeechUrl(url, encryptKey2, deadline2); + String signedUrl2 = CdnManager.createTimestampAntiLeechUrl(url, encryptKey2, deadline1); String signedUrl3 = CdnManager.createTimestampAntiLeechUrl(host, fileName, null, encryptKey1, deadline3); - System.out.println(signedUrl1); - System.out.println(signedUrl2); - System.out.println(signedUrl3); - Assert.assertEquals(msg, 200, getResponse(signedUrl1).statusCode); - Assert.assertEquals(msg, 200, getResponse(signedUrl2).statusCode); - Assert.assertEquals(msg, 403, getResponse(signedUrl3).statusCode); - Assert.assertEquals(testUrl_z0_timeStamp_outdate, signedUrl3); + Assert.assertEquals(u1, signedUrl1); + Assert.assertEquals(u2, signedUrl2); + Assert.assertEquals(u3, signedUrl3); + Assert.assertEquals(msg, 403, getResponse(url.toString()).statusCode); + Assert.assertEquals(msg, 200, getResponse(urlok).statusCode); } catch (Exception ex) { ex.printStackTrace(); Assert.fail(msg); diff --git a/src/test/java/test/com/qiniu/storage/BucketTest.java b/src/test/java/test/com/qiniu/storage/BucketTest.java index 64828a0b3..6afe8afd0 100644 --- a/src/test/java/test/com/qiniu/storage/BucketTest.java +++ b/src/test/java/test/com/qiniu/storage/BucketTest.java @@ -641,6 +641,7 @@ public void testBucketEvent() { ); System.out.println(rule.asQueryString()); response = bucketManager.putBucketEvent(bucket, rule); + System.out.println(response.getInfo()); Assert.assertEquals(200, response.statusCode); System.out.println(response.url()); System.out.println(response.reqId); @@ -663,14 +664,16 @@ public void testBucketEvent() { Assert.assertEquals(200, response.statusCode); // 重复追加Event(error:event prefix and suffix exists) - try { - rule.setName("b"); - System.out.println(rule.asQueryString()); - response = bucketManager.putBucketEvent(bucket, rule); - Assert.fail(); - } catch (QiniuException e) { - Assert.assertTrue(ResCode.find(e.code(), ResCode.getPossibleResCode(400))); - } + // 实际 portal 查看,添加成功了 // +// try { +// rule.setName("b"); +// System.out.println(rule.asQueryString()); +// response = bucketManager.putBucketEvent(bucket, rule); +// System.out.println("before Assert.fail():\n" + response.getInfo()); +// Assert.fail(); +// } catch (QiniuException e) { +// Assert.assertTrue(ResCode.find(e.code(), ResCode.getPossibleResCode(400))); +// } // 触发时间,回调成功与否 不检测 response = bucketManager.copy(bucket, key, bucket, key + "CopyByEvent", true); @@ -695,7 +698,6 @@ public void testBucketEvent() { response = bucketManager.updateBucketEvent(bucket, rule); Assert.assertEquals(200, response.statusCode); - // clear clearBucketEvent(bucket); // 再获取Event @@ -817,8 +819,13 @@ public void testPutBucketAccessStyleMode() { Assert.assertEquals(msg + url, 200, response.statusCode); // 关闭原图保护后,有一定延迟,直接访问会401 ... - //response = client.get(url + "?v" + r.nextDouble()); - //Assert.assertEquals(msg + url, 200, response.statusCode); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // do nothing + } + response = client.get(url + "?v" + r.nextDouble()); + Assert.assertEquals(msg + url, 200, response.statusCode); } catch (QiniuException e) { e.printStackTrace(); @@ -1163,7 +1170,8 @@ public void testBatchChangeType() { BatchStatus[] bs = r.jsonToObject(BatchStatus[].class); Assert.assertTrue("200 or 298", batchStatusCode.contains(bs[0].code)); } catch (QiniuException e) { - Assert.fail(e.response.toString()); + e.printStackTrace(); + Assert.fail(e.response + ""); } finally { try { bucketManager.delete(bucket, key); diff --git a/src/test/java/test/com/qiniu/storage/BucketTest2.java b/src/test/java/test/com/qiniu/storage/BucketTest2.java index cf1e40078..002d477b0 100644 --- a/src/test/java/test/com/qiniu/storage/BucketTest2.java +++ b/src/test/java/test/com/qiniu/storage/BucketTest2.java @@ -701,22 +701,6 @@ public void testBucketEvent() { } catch (QiniuException e) { Assert.assertTrue(ResCode.find(e.code(), ResCode.getPossibleResCode(400))); } - - // 更新Event - rule.setName("b"); - rule.setPrefix("c"); - rule.setEvents(new String[]{"disable", "enable", "deleteMarkerCreate"}); - System.out.println(rule.asQueryString()); - response = bucketManager.updateBucketEvent(bucket, rule); - Assert.assertEquals(200, response.statusCode); - - // clear - clearBucketEvent(bucket); - - // 再获取Event - rules = bucketManager.getBucketEvents(bucket); - Assert.assertEquals(0, rules.length); - } catch (QiniuException e) { Assert.fail("" + e.response); } @@ -739,6 +723,7 @@ private void clearBucketEvent(String bucket) throws QiniuException { } } + /** * 测试跨域规则 */ diff --git a/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java b/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java index 329dabcca..acd6c43ce 100644 --- a/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java +++ b/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java @@ -4,10 +4,10 @@ import com.qiniu.common.QiniuException; import com.qiniu.http.Client; import com.qiniu.http.Response; -import com.qiniu.storage.BucketManager; -import com.qiniu.storage.Configuration; -import com.qiniu.storage.FixBlockUploader; -import com.qiniu.util.Etag; +import com.qiniu.storage.*; +import com.qiniu.util.Auth; +import com.qiniu.util.EtagV2; +import com.qiniu.util.Md5; import com.qiniu.util.StringMap; import org.junit.Assert; import org.junit.Before; @@ -19,18 +19,21 @@ import java.io.FileInputStream; import java.io.IOException; import java.util.Date; +import java.util.List; import static junit.framework.TestCase.assertTrue; import static org.junit.Assert.assertEquals; public class FixBlockUploaderTest { - int blockSize = 1024 * 1024 * 8; + int blockSize = 1024 * 1023 * 4; Configuration config; Client client; FixBlockUploader up; String bucket; BucketManager bm; + MultipartUpload multipartUpload; + @Before public void init() { init2(false); @@ -43,8 +46,9 @@ private void init2(boolean useHttpsDomains) { up = new FixBlockUploader(blockSize, config, client, null); bucket = TestConfig.testBucket_z0; bm = new BucketManager(TestConfig.testAuth, config); - } + multipartUpload = new MultipartUpload(config, client); + } @Test public void testEmpty() throws IOException { @@ -66,13 +70,19 @@ public void test1K() throws IOException { template(1, false, true); } + @Test + public void test4M() throws IOException { + template(1024 * 4, false, true); + template(blockSize/1024, false, true); + } + @Test public void test3MK() throws IOException { template(1024 * 3, false, true); try { template(true, 1024 * 3, false, false, true); Assert.fail("file exists, can not be success."); - } catch (QiniuException e) { + } catch (QiniuException e) { assertTrue("file exists", e.response.error.indexOf("file exists") > -1); } // both the key and content are the same @@ -105,7 +115,6 @@ public void test7M() throws IOException { template(1024 * 7, true, false); } - @Test public void test12M1K() throws IOException { template(1024 * 12 + 1024, false, false); @@ -147,8 +156,10 @@ private void template(boolean isStream, int size, boolean https, boolean fixFile f = TempFile.createFileOld(size); } System.out.println(f.getAbsolutePath()); - final String etag = Etag.file(f); - System.out.println(etag); + final String etag = EtagV2.file(f, blockSize); + final String md5 = Md5.md5(f); + System.out.println("Etag(f): " + etag); + System.out.println("md5(f): " + md5); final String returnBody = "{\"key\":\"$(key)\",\"hash\":\"$(etag)\",\"fsize\":\"$(fsize)\"" + ",\"fname\":\"$(fname)\",\"mimeType\":\"$(mimeType)\"}"; @@ -157,10 +168,11 @@ private void template(boolean isStream, int size, boolean https, boolean fixFile p.put("insertOnly", 1); } String token = TestConfig.testAuth.uploadToken(bucket, expectKey, 3600, p); - FixBlockUploader.OptionsMeta param = new FixBlockUploader.OptionsMeta(); + MultipartUpload.OptionsMeta param = new MultipartUpload.OptionsMeta(); param.addMetadata("X-Qn-Meta-liubin", "sb"). addMetadata("X-Qn-Meta-!Content-Type", "text/liubin"). addMetadata("X-Qn-Meta-Cache-Control", "public, max-age=1984"); + try { System.out.println("Start uploading " + new Date()); Response r = null; @@ -200,7 +212,7 @@ private void template(boolean isStream, int size, boolean https, boolean fixFile @Test public void testEmptyKey() throws IOException { File f = TempFile.createFileOld(1); - String etag = Etag.file(f); + String etag = EtagV2.file(f, blockSize); String token = TestConfig.testAuth.uploadToken(bucket, null); Response res = up.upload(f, token, ""); System.out.println(res.getInfo()); @@ -212,7 +224,7 @@ public void testEmptyKey() throws IOException { @Test public void testNullKey() throws IOException { File f = TempFile.createFile(2); - String etag = Etag.file(f); + String etag = EtagV2.file(f, blockSize); String token = TestConfig.testAuth.uploadToken(bucket, null); Response res = up.upload(f, token, null); System.out.println(res.getInfo()); @@ -224,7 +236,7 @@ public void testNullKey() throws IOException { @Test public void testKey2() throws IOException { File f = TempFile.createFile(2); - String etag = Etag.file(f); + String etag = EtagV2.file(f, blockSize); String token = TestConfig.testAuth.uploadToken(bucket, "err"); try { Response res = up.upload(f, token, null); @@ -238,14 +250,14 @@ public void testKey2() throws IOException { @Test public void testMeat() throws IOException { File f = TempFile.createFile(1); - String etag = Etag.file(f); + String etag = EtagV2.file(f, blockSize); String returnBody = "{\"key\":\"$(key)\",\"hash\":\"$(etag)\",\"fsize\":\"$(fsize)\"" + ",\"fname\":\"$(x:biubiu)_$(fname)\",\"mimeType\":\"$(mimeType)\",\"biu2biu\":\"$(x:biu2biu)\"}"; StringMap p = new StringMap().put("returnBody", returnBody); String key = "俩个中文试试1.txt"; String token = TestConfig.testAuth.uploadToken(bucket, key, 3600, p); - FixBlockUploader.OptionsMeta param = new FixBlockUploader.OptionsMeta(); + MultipartUpload.OptionsMeta param = new MultipartUpload.OptionsMeta(); String mimeType = "mimetype/hehe"; param.setMimeType(mimeType); param.addCustomVar("x:biubiu", "duDu/werfhue3"); @@ -265,8 +277,8 @@ public void testMeat() throws IOException { Assert.assertEquals("duDu/werfhue3", ret.biu2biu); String resStr = res2.bodyString(); Assert.assertTrue("// 要有额外设置的元信息 metadata //\n" + new Gson().toJson(param), - resStr.indexOf("text/liubin") > -1 - && resStr.indexOf("teinYjf") > -1); + resStr.indexOf("text/liubin") > -1 + && resStr.indexOf("teinYjf") > -1); } class MyRet { @@ -278,4 +290,56 @@ class MyRet { public String biu2biu; } + @Test + public void listThenAbort() throws QiniuException { + listThenAbort(null); + listThenAbort(""); + listThenAbort("sTduhruwefjdhfgitvbor283Gsw.buys"); + } + + + public void listThenAbort(String key) throws QiniuException { + String bucket = TestConfig.testBucket_z0; + String upToken = TestConfig.testAuth.uploadToken(bucket); // 默认 3600 秒内有效 + + multipartUpload.initUpHost(upToken); // 最好有这一步 // + Response initRes = multipartUpload.initiateMultipartUpload(bucket, key, upToken); + MultipartUpload.InitRet initRet = initRes.jsonToObject(MultipartUpload.InitRet.class); + + // 实际上传中,除 最后一块外,其它块大小必须大于 1M,否则 completeMultipartUpload 会报错 // + byte[] data = new byte[]{1,2,3,4}; + Response uploadPartRes = multipartUpload.uploadPart(bucket, key, upToken, initRet.getUploadId(), data, 1); + MultipartUpload.UploadPartRet uploadPartRet = uploadPartRes.jsonToObject(MultipartUpload.UploadPartRet.class); + + MultipartUpload.EtagIdx e1 = new MultipartUpload.EtagIdx(uploadPartRet.getEtag(), 1, data.length); + System.out.println("uploadPart: " + e1); + + data = new byte[]{1,2,3,4,56,7,8}; + uploadPartRes = multipartUpload.uploadPart(bucket, key, upToken, initRet.getUploadId(), data, 2); + uploadPartRet = uploadPartRes.jsonToObject(MultipartUpload.UploadPartRet.class); + + MultipartUpload.EtagIdx e2 = new MultipartUpload.EtagIdx(uploadPartRet.getEtag(), 2, data.length); + System.out.println("uploadPart: " + e2); + + List listRet = multipartUpload.listPartsAll(bucket, key, upToken, initRet.getUploadId(), 1); + System.out.println("listPartsAll: "+ listRet); + Assert.assertEquals(2, listRet.size()); + Assert.assertEquals(e1.getPartNumber(), listRet.get(0).getPartNumber()); + Assert.assertEquals(e1.getEtag(), listRet.get(0).getEtag()); + Assert.assertEquals(e2.getPartNumber(), listRet.get(1).getPartNumber()); + Assert.assertEquals(e2.getEtag(), listRet.get(1).getEtag()); + + + listRet = multipartUpload.listPartsAll(bucket, key, upToken, initRet.getUploadId()); + System.out.println("listPartsAll: "+ listRet); + Assert.assertEquals(2, listRet.size()); + Assert.assertEquals(e1.getPartNumber(), listRet.get(0).getPartNumber()); + Assert.assertEquals(e1.getEtag(), listRet.get(0).getEtag()); + Assert.assertEquals(e2.getPartNumber(), listRet.get(1).getPartNumber()); + Assert.assertEquals(e2.getEtag(), listRet.get(1).getEtag()); + + Response abortRes = multipartUpload.abortMultipartUpload(bucket, key, upToken, initRet.getUploadId()); + Assert.assertTrue(abortRes.isOK()); + } + } diff --git a/src/test/java/test/com/qiniu/storage/FormUploadTest.java b/src/test/java/test/com/qiniu/storage/FormUploadTest.java index 46e0a9f9e..1d404497b 100644 --- a/src/test/java/test/com/qiniu/storage/FormUploadTest.java +++ b/src/test/java/test/com/qiniu/storage/FormUploadTest.java @@ -4,6 +4,7 @@ import com.qiniu.common.Zone; import com.qiniu.http.Response; import com.qiniu.storage.Configuration; +import com.qiniu.storage.Region; import com.qiniu.storage.UpCompletionHandler; import com.qiniu.storage.UploadManager; import com.qiniu.util.StringMap; @@ -201,8 +202,9 @@ public void testFile() { try { uploadManager.put(f, expectKey, token, params, null, true); } catch (QiniuException e) { + e.printStackTrace(); TempFile.remove(f); - fail(e.response.toString()); + fail(e.response + ""); } TempFile.remove(f); } diff --git a/src/test/java/test/com/qiniu/util/EtagTest.java b/src/test/java/test/com/qiniu/util/EtagTest.java index d1d463fa5..d47123b41 100644 --- a/src/test/java/test/com/qiniu/util/EtagTest.java +++ b/src/test/java/test/com/qiniu/util/EtagTest.java @@ -2,11 +2,17 @@ import com.qiniu.common.Constants; import com.qiniu.util.Etag; +import com.qiniu.util.UrlSafeBase64; import org.junit.Test; import test.com.qiniu.TempFile; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; import static org.junit.Assert.assertEquals; @@ -46,4 +52,5 @@ public void testFile() throws IOException { assertEquals("ll1xhlUFKQqynVgMMt_J1TuTrdB1", Etag.file(f)); TempFile.remove(f); } + }