From 33386c50b0a16868451cabd07da7054d603dfcb9 Mon Sep 17 00:00:00 2001 From: Sxci Date: Mon, 23 Mar 2020 19:39:19 +0800 Subject: [PATCH 01/12] 1 --- .../com/qiniu/storage/FixBlockUploader.java | 50 +++++++++++-------- .../qiniu/storage/FixBlockUploaderTest.java | 12 +++++ 2 files changed, 42 insertions(+), 20 deletions(-) diff --git a/src/main/java/com/qiniu/storage/FixBlockUploader.java b/src/main/java/com/qiniu/storage/FixBlockUploader.java index f9ad054d4..07563cf59 100644 --- a/src/main/java/com/qiniu/storage/FixBlockUploader.java +++ b/src/main/java/com/qiniu/storage/FixBlockUploader.java @@ -30,13 +30,13 @@ public class FixBlockUploader { private String host = null; /** - * @param blockSize must be multiples of 4M. + * @param blockSize must be >= 4 * 1024 * 1024. * @param configuration Nullable, if null, then create a new one. * @param client Nullable, if null, then create a new one with configuration. * @param recorder Nullable. */ public FixBlockUploader(int blockSize, Configuration configuration, Client client, Recorder recorder) { - assert blockSize > 0 && blockSize % (4 * 1024 * 1024) == 0 : "blockSize must be multiples of 4M "; + assert blockSize >= 4 * 1024 * 1024 : "blockSize must be >= 4M "; if (configuration == null) { configuration = new Configuration(); @@ -107,9 +107,9 @@ Response upload(BlockData blockData, Token token, String key, OptionsMeta params /* 上传到七牛存储保存的文件名, 需要进行UrlSafeBase64编码。 注意: - 当设置为空时表示空的文件名; - 当设置为未进行 UrlSafeBase64 编码的字符 ~ 的时候,表示未设置文件名, - 具体行为如分片上传v1: 使用文件的hash最为文件名, 如果设置了saveKey则使用saveKey的规则进行文件命名 + 当 key 为空 "" 时表示空的文件名,正常进行 url_safe_base64 编码; + 当 key 为未进行 UrlSafeBase64 编码的字符 ~ 的时候,表示未设置文件名, + 具体行为如分片上传v1: 使用文件的 hash 作为文件名, 如果设置了saveKey则使用saveKey的规则进行文件命名 */ String base64Key = key != null ? UrlSafeBase64.encodeToString(key) : "~"; String recordFileKey = (recorder == null) ? "NULL" @@ -120,12 +120,15 @@ Response upload(BlockData blockData, Token token, String key, OptionsMeta params host = configHelper.upHost(token.getUpToken()); } UploadRecordHelper recordHelper = new UploadRecordHelper(recorder, recordFileKey, blockData.repeatable()); + // 1. initParts Record record = initUpload(blockData, recordHelper, bucket, base64Key, token); boolean repeatable = recorder != null && blockData.repeatable(); Response res; try { + // 2. uploadPart upBlock(blockData, token, bucket, base64Key, repeatable, record, pool, maxRunningBlock); + // 3. completeParts res = makeFile(bucket, base64Key, token, record.uploadId, record.etagIdxes, blockData.getFileName(), params); } catch (QiniuException e) { @@ -154,14 +157,15 @@ record = null; } if (record == null || record.uploadId == null) { - String uploadId = init(bucket, base64Key, token.getUpToken()); + InitRet ret = init(bucket, base64Key, token.getUpToken()); + List etagIdxes = new ArrayList<>(); - record = initRecord(uploadId, etagIdxes); + record = initRecord(ret, etagIdxes); } return record; } - String init(String bucket, String base64Key, String upToken) throws QiniuException { + InitRet init(String bucket, String base64Key, String upToken) throws QiniuException { String url = host + "/buckets/" + bucket + "/objects/" + base64Key + "/uploads"; byte[] data = new byte[0]; StringMap headers = new StringMap().put("Authorization", "UpToken " + upToken); @@ -205,9 +209,9 @@ String init(String bucket, String base64Key, String upToken) throws QiniuExcepti } try { - String uploadId = res.jsonToMap().get("uploadId").toString(); - if (uploadId.length() > 10) { - return uploadId; + InitRet ret = res.jsonToObject(InitRet.class); + if (ret != null && ret.uploadId != null && ret.uploadId.length() > 10 && ret.expireAt > 1000) { + return ret; } } catch (Exception e) { // ignore, see next line @@ -432,7 +436,7 @@ Response makeFile(String bucket, String base64Key, Token token, String uploadId, String fileName, OptionsMeta params) throws QiniuException { String url = host + "/buckets/" + bucket + "/objects/" + base64Key + "/uploads/" + uploadId; final StringMap headers = new StringMap().put("Authorization", "UpToken " + token.getUpToken()); - sortAsc(etags); + sortByPartNumberAsc(etags); byte[] data = new MakefileBody(etags, fileName, params) .json().getBytes(Charset.forName("UTF-8")); @@ -493,7 +497,7 @@ private String parseBucket(String upToken) throws QiniuException { } } - static void sortAsc(List etags) { + static void sortByPartNumberAsc(List etags) { Collections.sort(etags, new Comparator() { @Override public int compare(EtagIdx o1, EtagIdx o2) { @@ -567,17 +571,19 @@ public String toString() { class Record { - long createdTime; + // second + long expireAt; String uploadId; long size; List etagIdxes; } - Record initRecord(String uploadId, List etagIdxes) { + Record initRecord(InitRet ret, List etagIdxes) { Record record = new Record(); - record.createdTime = System.currentTimeMillis(); - record.uploadId = uploadId; + record.uploadId = ret.uploadId; + //// 服务端 7 天内有效,设置 5 天 //// + record.expireAt = ret.expireAt - 3600 * 24 * 2; record.size = 0; record.etagIdxes = etagIdxes != null ? etagIdxes : new ArrayList(); @@ -620,15 +626,14 @@ public void delRecord() { public void syncRecord(Record record) { if (needRecord && recorder != null && record.etagIdxes.size() > 0) { - sortAsc(record.etagIdxes); + sortByPartNumberAsc(record.etagIdxes); recorder.set(recordFileKey, new Gson().toJson(record).getBytes(Charset.forName("UTF-8"))); } } public boolean isActiveRecord(Record record, BlockData blockData) { - //// 服务端 7 天内有效,设置 5 天 //// boolean isOk = record != null - && record.createdTime > System.currentTimeMillis() - 1000 * 3600 * 24 * 5 + && record.expireAt < (System.currentTimeMillis() / 1000) && !StringUtils.isNullOrEmpty(record.uploadId) && record.etagIdxes != null && record.etagIdxes.size() > 0 && record.size > 0 && record.size <= blockData.size(); @@ -652,6 +657,11 @@ public boolean isActiveRecord(Record record, BlockData blockData) { /////////////////////////////////////// + class InitRet { + String uploadId; + long expireAt; + } + abstract static class BlockData { protected final int blockDataSize; diff --git a/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java b/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java index dac097c77..164f005bc 100644 --- a/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java +++ b/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java @@ -22,6 +22,7 @@ import static junit.framework.TestCase.assertTrue; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; public class FixBlockUploaderTest { int blockSize = 1024 * 1024 * 8; @@ -45,6 +46,17 @@ private void init2(boolean useHttpsDomains) { bm = new BucketManager(TestConfig.testAuth, config); } + @Test + public void testInit() { + Configuration config = new Configuration(); + Client client = new Client(config); + try { + new FixBlockUploader(1024 * 1024 * 3, config, client, null); + fail("block size must be >= 1024 * 1024 * 4"); + } catch (AssertionError e) { + assertTrue(e.getMessage().indexOf("blockSize must be >= 4M") > -1); + } + } @Test public void testEmpty() throws IOException { From f95c239928c0e9e5fdd344dc755b670dbe0c1eb8 Mon Sep 17 00:00:00 2001 From: Sxci Date: Mon, 23 Mar 2020 19:41:10 +0800 Subject: [PATCH 02/12] 1 [ci skip] --- src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java b/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java index 164f005bc..b3ca0c402 100644 --- a/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java +++ b/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java @@ -290,5 +290,4 @@ class MyRet { public String mimeType; public String biu2biu; } - } From 07d314e0fb5b4934e3602d10261bddd10d9a2489 Mon Sep 17 00:00:00 2001 From: Sxci Date: Thu, 26 Mar 2020 17:39:09 +0800 Subject: [PATCH 03/12] remove block size check [ci skip] --- .../com/qiniu/storage/FixBlockUploader.java | 38 ++++++++++--------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/qiniu/storage/FixBlockUploader.java b/src/main/java/com/qiniu/storage/FixBlockUploader.java index 07563cf59..834044b9d 100644 --- a/src/main/java/com/qiniu/storage/FixBlockUploader.java +++ b/src/main/java/com/qiniu/storage/FixBlockUploader.java @@ -12,7 +12,6 @@ import java.io.RandomAccessFile; import java.nio.charset.Charset; import java.util.*; - import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -30,13 +29,15 @@ public class FixBlockUploader { private String host = null; /** - * @param blockSize must be >= 4 * 1024 * 1024. + * @param blockSize block size, eg: 4 * 1024 * 1024 * @param configuration Nullable, if null, then create a new one. * @param client Nullable, if null, then create a new one with configuration. * @param recorder Nullable. */ public FixBlockUploader(int blockSize, Configuration configuration, Client client, Recorder recorder) { - assert blockSize >= 4 * 1024 * 1024 : "blockSize must be >= 4M "; + if (blockSize <= 0) { + blockSize = 4 * 1024 * 1024; + } if (configuration == null) { configuration = new Configuration(); @@ -209,7 +210,7 @@ InitRet init(String bucket, String base64Key, String upToken) throws QiniuExcept } try { - InitRet ret = res.jsonToObject(InitRet.class); + InitRet ret = res.jsonToObject(InitRet.class); if (ret != null && ret.uploadId != null && ret.uploadId.length() > 10 && ret.expireAt > 1000) { return ret; } @@ -237,7 +238,7 @@ private boolean useParallel(ExecutorService pool, BlockData blockData, Record re } private void seqUpload(BlockData blockData, Token token, String bucket, - String base64Key, Record record) throws QiniuException { + String base64Key, Record record) throws QiniuException { final String uploadId = record.uploadId; final List etagIdxes = record.etagIdxes; RetryCounter counter = new NormalRetryCounter(retryMax); @@ -266,8 +267,8 @@ private void seqUpload(BlockData blockData, Token token, String bucket, } private void parallelUpload(BlockData blockData, final Token token, - final String bucket, final String base64Key, Record record, - boolean needRecord, ExecutorService pool, int maxRunningBlock) throws QiniuException { + final String bucket, final String base64Key, Record record, + boolean needRecord, ExecutorService pool, int maxRunningBlock) throws QiniuException { final String uploadId = record.uploadId; final List etagIdxes = record.etagIdxes; final RetryCounter counter = new AsyncRetryCounter(retryMax); @@ -346,7 +347,7 @@ private void waitingEnough(int maxRunningBlock, List> futures) { if (futures.size() - done < maxRunningBlock) { break; } - sleepMillis(500); + sleepMillis(100); } } @@ -550,9 +551,9 @@ public String json() { class EtagIdx { - String etag; - int partNumber; - transient int size; + String etag; // mkfile + int partNumber; // mkfile + transient int size; // 本地使用,不写入 json 断点记录 // EtagIdx(String etag, int idx, int size) { this.etag = etag; @@ -574,7 +575,7 @@ class Record { // second long expireAt; String uploadId; - long size; + long size; // sum(current putParts size) EtagIdx.size 未序列化,历史断点记录没法获取到已上传大小 // List etagIdxes; } @@ -584,7 +585,7 @@ Record initRecord(InitRet ret, List etagIdxes) { record.uploadId = ret.uploadId; //// 服务端 7 天内有效,设置 5 天 //// record.expireAt = ret.expireAt - 3600 * 24 * 2; - record.size = 0; + record.size = 0; // 本次上传到需要写断点记录时上传的总大小,EtagIdx.size 未序列化,历史断点记录没法获取到已上传大小 // record.etagIdxes = etagIdxes != null ? etagIdxes : new ArrayList(); return record; @@ -640,7 +641,7 @@ public boolean isActiveRecord(Record record, BlockData blockData) { if (isOk) { int p = 0; // PartNumber start with 1 and increase by 1 // - // 当前文件各块串行 if (ei.idx == p + 1) . 若并行,需额外考虑 // + // 并行上传,中间块可能缺失(上传失败) // for (EtagIdx ei : record.etagIdxes) { if (ei.partNumber > p) { p = ei.partNumber; @@ -954,6 +955,7 @@ public String getFileName() { static void sleepMillis(long millis) { + // LockSupport.parkNanos(millis * 1000 * 1000); // or try { Thread.sleep(millis); } catch (InterruptedException e) { @@ -985,8 +987,8 @@ public OptionsMeta setMimeType(String mimeType) { } /** - * @param key start with X-Qn-Meta- - * @param value not null or empty + * @param key start with X-Qn-Meta- + * @param value not null or empty */ public OptionsMeta addMetadata(String key, String value) { if (metadata == null) { @@ -997,8 +999,8 @@ public OptionsMeta addMetadata(String key, String value) { } /** - * @param key start with x: - * @param value not null or empty + * @param key start with x: + * @param value not null or empty */ public OptionsMeta addCustomVar(String key, String value) { if (customVars == null) { From 56bb6fb2461ef015965b8a3b29e3e0076a361ea5 Mon Sep 17 00:00:00 2001 From: Sxci Date: Mon, 30 Mar 2020 12:15:52 +0800 Subject: [PATCH 04/12] [ci skip] etag v2 --- .../com/qiniu/storage/FixBlockUploader.java | 26 +++-- .../FixBlockUploaderWithRecorderTest.java | 40 +++++-- .../qiniu/storage/FixBlockUploaderTest.java | 46 ++++---- .../java/test/com/qiniu/util/EtagTest.java | 104 ++++++++++++++++++ 4 files changed, 177 insertions(+), 39 deletions(-) diff --git a/src/main/java/com/qiniu/storage/FixBlockUploader.java b/src/main/java/com/qiniu/storage/FixBlockUploader.java index 834044b9d..d14468ac6 100644 --- a/src/main/java/com/qiniu/storage/FixBlockUploader.java +++ b/src/main/java/com/qiniu/storage/FixBlockUploader.java @@ -334,11 +334,12 @@ private boolean alreadyDone(int index, List etagIdxes) { } private void waitingEnough(int maxRunningBlock, List> futures) { - for (; ; ) { + while (true) { if (futures.size() < maxRunningBlock) { break; } int done = 0; + // max(len(futures)) = 10000 for (Future future : futures) { if (future.isDone()) { done++; @@ -353,10 +354,18 @@ private void waitingEnough(int maxRunningBlock, List> futures) { EtagIdx uploadBlock(String bucket, String base64Key, Token token, String uploadId, byte[] data, int dataLength, int partNum, RetryCounter counter) throws QiniuException { - Response res = uploadBlockWithRetry(bucket, base64Key, token, uploadId, data, dataLength, partNum, counter); + String url = host + "/buckets/" + bucket + "/objects/" + base64Key + "/uploads/" + uploadId + "/" + partNum; + String md5 = Md5.md5(data, 0, dataLength); + StringMap headers = new StringMap(). + put("Content-MD5", md5). + put("Authorization", "UpToken " + token.getUpToken()); + + Response res = uploadBlockWithRetry(url, token, headers, data, dataLength, counter); try { - String etag = res.jsonToMap().get("etag").toString(); - if (etag.length() > 10) { + StringMap m = res.jsonToMap(); + // String qMd5 = m.get("md5").toString(); + String etag = m.get("etag").toString(); + if (/*md5.equals(qMd5) && */etag.length() > 10) { return new EtagIdx(etag, partNum, dataLength); } } catch (Exception e) { @@ -365,14 +374,9 @@ EtagIdx uploadBlock(String bucket, String base64Key, Token token, String uploadI throw new QiniuException(res); } - Response uploadBlockWithRetry(String bucket, String base64Key, Token token, String uploadId, - byte[] data, int dataLength, int partNum, RetryCounter counter) + Response uploadBlockWithRetry(String url, Token token, StringMap headers, + byte[] data, int dataLength, RetryCounter counter) throws QiniuException { - String url = host + "/buckets/" + bucket + "/objects/" + base64Key + "/uploads/" + uploadId + "/" + partNum; - StringMap headers = new StringMap(). - put("Content-MD5", Md5.md5(data, 0, dataLength)). - put("Authorization", "UpToken " + token.getUpToken()); - // 在 最多重试次数 范围内, 每个块至多上传 3 次 // // 1 Response res = uploadBlock1(url, data, dataLength, headers, true); diff --git a/src/test/java/com/qiniu/storage/FixBlockUploaderWithRecorderTest.java b/src/test/java/com/qiniu/storage/FixBlockUploaderWithRecorderTest.java index 022aca857..af3a9601c 100644 --- a/src/test/java/com/qiniu/storage/FixBlockUploaderWithRecorderTest.java +++ b/src/test/java/com/qiniu/storage/FixBlockUploaderWithRecorderTest.java @@ -12,6 +12,7 @@ import org.junit.Test; import test.com.qiniu.TempFile; import test.com.qiniu.TestConfig; +import test.com.qiniu.util.EtagTest; import java.io.File; import java.io.IOException; @@ -22,20 +23,31 @@ import static org.junit.Assert.assertEquals; public class FixBlockUploaderWithRecorderTest { - int blockSize = 1024 * 1024 * 8; + int blockSize = 1024 * 1025; Configuration config; Client client; FixBlockUploader up; String bucket; Recorder recorder; + BucketManager bm; @Before public void init() { - init2(false); + init2(true); } private void init2(boolean useHttpsDomains) { - config = new Configuration(); + Region rhttps = new Region.Builder(Region.region0()) + .accUpHost("up-dev.qiniu.io") + .srcUpHost("up-dev.qiniu.io") + .rsHost("rs-dev.qiniu.io") + .rsfHost("rsf-dev.qiniu.io").build(); + Region rhttp = new Region.Builder(Region.region0()) + .accUpHost("up.dev-kodo.dev.qiniu.io") + .srcUpHost("up.dev-kodo.dev.qiniu.io") + .rsHost("rs.dev-kodo.dev.qiniu.io") + .rsfHost("rsf.dev-kodo.dev.qiniu.io").build(); + config = new Configuration(useHttpsDomains ? rhttps : rhttp); config.useHttpsDomains = useHttpsDomains; client = new Client(config); @@ -47,7 +59,10 @@ private void init2(boolean useHttpsDomains) { e.printStackTrace(); } up = new FixBlockUploader(blockSize, config, client, recorder); + bm = new BucketManager(TestConfig.testAuth, config); bucket = TestConfig.testBucket_as0; + + bucket = "publicbucket_z0"; } @Test @@ -57,10 +72,10 @@ public void breakThenUpload1() throws IOException { @Test public void breakThenUpload2() throws IOException { - ExecutorService pool = new ThreadPoolExecutor(0, 2, + ExecutorService pool = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue()); - breakThenUpload(pool, Executors.newFixedThreadPool(2), Executors.newCachedThreadPool(), 10, 10, 2); + breakThenUpload(pool, Executors.newFixedThreadPool(2), Executors.newCachedThreadPool(), 10, 10, 7); } @@ -91,12 +106,12 @@ public void breakThenUpload(ExecutorService pool1, ExecutorService pool2, Execut public void breakThenUpload(final ExecutorService pool1, final ExecutorService pool2, final ExecutorService pool3, int upSecondsTime1, int upSecondsTime2, final int maxRunningBlock) throws IOException { - final long size = 1024 * 53 + 1039; + final long size = 1024 * 83 + 1039; final String expectKey = "\r\n?&r=" + size + "k" + System.currentTimeMillis(); final File f = TempFile.createFileOld(size); final FixBlockUploader.FileBlockData fbd = new FixBlockUploader.FileBlockData(blockSize, f); System.out.println(f.getAbsolutePath()); - final String etag = Etag.file(f); + final String etag = EtagTest.etagV2(f, blockSize); final String returnBody = "{\"key\":\"$(key)\",\"hash\":\"$(etag)\",\"fsize\":\"$(fsize)\"" + ",\"fname\":\"$(fname)\",\"mimeType\":\"$(mimeType)\"}"; @@ -105,6 +120,12 @@ public void breakThenUpload(final ExecutorService pool1, final ExecutorService p final String token = TestConfig.testAuth.uploadToken(bucket, expectKey, 3600, p); final int[] t1Finished = {0}; + try { + bm.delete(bucket, expectKey); + } catch (Exception e) { + // do nothing + } + Thread t1 = new Thread() { @Override public void run() { @@ -230,6 +251,11 @@ public void run() { throw e; } finally { TempFile.remove(f); + try { + bm.delete(bucket, expectKey); + } catch (Exception e) { + // do nothing + } } } diff --git a/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java b/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java index b3ca0c402..079312977 100644 --- a/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java +++ b/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java @@ -7,13 +7,16 @@ import com.qiniu.storage.BucketManager; import com.qiniu.storage.Configuration; import com.qiniu.storage.FixBlockUploader; +import com.qiniu.storage.Region; import com.qiniu.util.Etag; +import com.qiniu.util.Md5; import com.qiniu.util.StringMap; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import test.com.qiniu.TempFile; import test.com.qiniu.TestConfig; +import test.com.qiniu.util.EtagTest; import java.io.File; import java.io.FileInputStream; @@ -22,10 +25,9 @@ import static junit.framework.TestCase.assertTrue; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; public class FixBlockUploaderTest { - int blockSize = 1024 * 1024 * 8; + int blockSize = 1024 * 1023 * 4; Configuration config; Client client; FixBlockUploader up; @@ -38,26 +40,26 @@ public void init() { } private void init2(boolean useHttpsDomains) { - config = new Configuration(); + Region r = useHttpsDomains + ? new Region.Builder(Region.region0()) + .accUpHost("up-dev.qiniu.io") + .srcUpHost("up-dev.qiniu.io") + .rsHost("rs-dev.qiniu.io") + .rsfHost("rsf-dev.qiniu.io").build() + : new Region.Builder(Region.region0()) + .accUpHost("up.dev-kodo.dev.qiniu.io") + .srcUpHost("up.dev-kodo.dev.qiniu.io") + .rsHost("rs.dev-kodo.dev.qiniu.io") + .rsfHost("rsf.dev-kodo.dev.qiniu.io").build(); + config = new Configuration(r); config.useHttpsDomains = useHttpsDomains; client = new Client(config); up = new FixBlockUploader(blockSize, config, client, null); bucket = TestConfig.testBucket_z0; + bucket = "publicbucket_z0"; bm = new BucketManager(TestConfig.testAuth, config); } - @Test - public void testInit() { - Configuration config = new Configuration(); - Client client = new Client(config); - try { - new FixBlockUploader(1024 * 1024 * 3, config, client, null); - fail("block size must be >= 1024 * 1024 * 4"); - } catch (AssertionError e) { - assertTrue(e.getMessage().indexOf("blockSize must be >= 4M") > -1); - } - } - @Test public void testEmpty() throws IOException { try { @@ -160,8 +162,9 @@ private void template(boolean isStream, int size, boolean https, boolean fixFile f = TempFile.createFileOld(size); } System.out.println(f.getAbsolutePath()); - final String etag = Etag.file(f); - System.out.println(etag); + final String etag = EtagTest.etagV2(f, blockSize); + final String md5 = Md5.md5(f); + System.out.println("Etag(f): " + etag); final String returnBody = "{\"key\":\"$(key)\",\"hash\":\"$(etag)\",\"fsize\":\"$(fsize)\"" + ",\"fname\":\"$(fname)\",\"mimeType\":\"$(mimeType)\"}"; @@ -174,6 +177,7 @@ private void template(boolean isStream, int size, boolean https, boolean fixFile param.addMetadata("X-Qn-Meta-liubin", "sb"). addMetadata("X-Qn-Meta-!Content-Type", "text/liubin"). addMetadata("X-Qn-Meta-Cache-Control", "public, max-age=1984"); + try { System.out.println("Start uploading " + new Date()); Response r = null; @@ -213,7 +217,7 @@ private void template(boolean isStream, int size, boolean https, boolean fixFile @Test public void testEmptyKey() throws IOException { File f = TempFile.createFileOld(1); - String etag = Etag.file(f); + String etag = EtagTest.etagV2(f); String token = TestConfig.testAuth.uploadToken(bucket, null); Response res = up.upload(f, token, ""); System.out.println(res.getInfo()); @@ -225,7 +229,7 @@ public void testEmptyKey() throws IOException { @Test public void testNullKey() throws IOException { File f = TempFile.createFile(2); - String etag = Etag.file(f); + String etag = EtagTest.etagV2(f); String token = TestConfig.testAuth.uploadToken(bucket, null); Response res = up.upload(f, token, null); System.out.println(res.getInfo()); @@ -237,7 +241,7 @@ public void testNullKey() throws IOException { @Test public void testKey2() throws IOException { File f = TempFile.createFile(2); - String etag = Etag.file(f); + String etag = EtagTest.etagV2(f); String token = TestConfig.testAuth.uploadToken(bucket, "err"); try { Response res = up.upload(f, token, null); @@ -251,7 +255,7 @@ public void testKey2() throws IOException { @Test public void testMeat() throws IOException { File f = TempFile.createFile(1); - String etag = Etag.file(f); + String etag = EtagTest.etagV2(f); String returnBody = "{\"key\":\"$(key)\",\"hash\":\"$(etag)\",\"fsize\":\"$(fsize)\"" + ",\"fname\":\"$(x:biubiu)_$(fname)\",\"mimeType\":\"$(mimeType)\",\"biu2biu\":\"$(x:biu2biu)\"}"; diff --git a/src/test/java/test/com/qiniu/util/EtagTest.java b/src/test/java/test/com/qiniu/util/EtagTest.java index d1d463fa5..dff67a62a 100644 --- a/src/test/java/test/com/qiniu/util/EtagTest.java +++ b/src/test/java/test/com/qiniu/util/EtagTest.java @@ -2,11 +2,17 @@ import com.qiniu.common.Constants; import com.qiniu.util.Etag; +import com.qiniu.util.UrlSafeBase64; import org.junit.Test; import test.com.qiniu.TempFile; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; import static org.junit.Assert.assertEquals; @@ -46,4 +52,102 @@ public void testFile() throws IOException { assertEquals("ll1xhlUFKQqynVgMMt_J1TuTrdB1", Etag.file(f)); TempFile.remove(f); } + + public static String etagV2(File file) throws IOException { + return etagV2(file, 1024 * 1024 * 4); + } + + public static String etagV2(File file, long blockSize) throws IOException { + FileInputStream fi = null; + try { + fi = new FileInputStream(file); + return etagV2(fi, file.length(), blockSize); + } finally { + if (fi != null) { + try { + fi.close(); + } catch (Throwable t) { + } + } + } + } + + public static String etagV2(File file, long[] parts) throws IOException { + FileInputStream fi = null; + try { + fi = new FileInputStream(file); + return etagV2(fi, file.length(), parts); + } finally { + if (fi != null) { + try { + fi.close(); + } catch (Throwable t) { + } + } + } + } + + public static String etagV2(InputStream in, long len) throws IOException { + return etagV2(in, len, 1024 * 1024 * 4); + } + + public static String etagV2(InputStream in, long len, long blockSize) throws IOException { + if (blockSize == 1024 * 1024 * 4 || len <= blockSize) { + return Etag.stream(in, len); + } + int l = (int)((len + blockSize - 1) / blockSize); + long[] parts = new long[l]; + Arrays.fill(parts, blockSize); + parts[l-1] = len % blockSize; + + return etagV2NoCheck(in, len, parts); + } + + public static String etagV2(InputStream in, long len, long[] parts) throws IOException { + if (is4MBParts(parts)) { + return Etag.stream(in, len); + } + long partSize = 0; + for (long part : parts) { + partSize += part; + } + if (len != partSize) { + throw new IOException("etag calc failed: size not equal with part size"); + } + return etagV2NoCheck(in, len, parts); + } + + private static boolean is4MBParts(long[] parts) { + if (parts.length == 0) { + return true; + } + int idx = 0; + int last = parts.length - 1; + for (long part : parts) { + if (idx != last && part != 1024 * 1024 * 4 || part > 1024 * 1024 * 4) { + return false; + } + idx += 1; + } + return true; + } + + private static String etagV2NoCheck(InputStream in, long len, long[] parts) throws IOException { + MessageDigest sha1; + try { + sha1 = MessageDigest.getInstance("sha-1"); + } catch (NoSuchAlgorithmException e) { + throw new AssertionError(e); + } + for (long part : parts) { + String partEtag = Etag.stream(in, part); + byte[] bytes = UrlSafeBase64.decode(partEtag); + sha1.update(bytes, 1, bytes.length-1); + } + byte[] digest = sha1.digest(); + byte[] ret = new byte[digest.length + 1]; + ret[0] = (byte)0x9e; + System.arraycopy(digest, 0, ret, 1, digest.length); + return UrlSafeBase64.encodeToString(ret); + } } From 36796a8ac3850e3e120e35e6e6d78d0cc4e1d1c1 Mon Sep 17 00:00:00 2001 From: Sxci Date: Mon, 30 Mar 2020 13:32:41 +0800 Subject: [PATCH 05/12] [ci skip] fix etag v2 --- src/test/java/test/com/qiniu/util/EtagTest.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/test/java/test/com/qiniu/util/EtagTest.java b/src/test/java/test/com/qiniu/util/EtagTest.java index dff67a62a..bf892ca3d 100644 --- a/src/test/java/test/com/qiniu/util/EtagTest.java +++ b/src/test/java/test/com/qiniu/util/EtagTest.java @@ -92,13 +92,16 @@ public static String etagV2(InputStream in, long len) throws IOException { } public static String etagV2(InputStream in, long len, long blockSize) throws IOException { - if (blockSize == 1024 * 1024 * 4 || len <= blockSize) { + if (blockSize == 1024 * 1024 * 4 || (len <= blockSize && len <= 1024 * 1024 * 4)) { return Etag.stream(in, len); } - int l = (int)((len + blockSize - 1) / blockSize); - long[] parts = new long[l]; + int size = (int) ((len + blockSize - 1) / blockSize); + long[] parts = new long[size]; Arrays.fill(parts, blockSize); - parts[l-1] = len % blockSize; + parts[size - 1] = len % blockSize; + + // is4MBParts(parts) include (len <= blockSize && len <= 1024 * 1024 * 4) + // means only one block, and it's size <= 4M return etagV2NoCheck(in, len, parts); } @@ -142,11 +145,11 @@ private static String etagV2NoCheck(InputStream in, long len, long[] parts) thro for (long part : parts) { String partEtag = Etag.stream(in, part); byte[] bytes = UrlSafeBase64.decode(partEtag); - sha1.update(bytes, 1, bytes.length-1); + sha1.update(bytes, 1, bytes.length - 1); } byte[] digest = sha1.digest(); byte[] ret = new byte[digest.length + 1]; - ret[0] = (byte)0x9e; + ret[0] = (byte) 0x9e; System.arraycopy(digest, 0, ret, 1, digest.length); return UrlSafeBase64.encodeToString(ret); } From 9a68b64ed40d2a2191589c33f4041acd60f29ba7 Mon Sep 17 00:00:00 2001 From: Sxci Date: Sun, 12 Apr 2020 19:22:32 +0800 Subject: [PATCH 06/12] add etag v2; fix some test --- src/main/java/com/qiniu/util/EtagV2.java | 136 ++++++++++++++++++ .../FixBlockUploaderWithRecorderTest.java | 22 +-- src/test/java/test/com/qiniu/CdnTest.java | 27 ++-- .../test/com/qiniu/storage/BucketTest.java | 28 ++-- .../test/com/qiniu/storage/BucketTest2.java | 128 ----------------- .../qiniu/storage/FixBlockUploaderTest.java | 36 ++--- .../com/qiniu/storage/FormUploadTest.java | 4 +- .../java/test/com/qiniu/util/EtagTest.java | 100 ------------- 8 files changed, 187 insertions(+), 294 deletions(-) create mode 100644 src/main/java/com/qiniu/util/EtagV2.java diff --git a/src/main/java/com/qiniu/util/EtagV2.java b/src/main/java/com/qiniu/util/EtagV2.java new file mode 100644 index 000000000..10c1246ac --- /dev/null +++ b/src/main/java/com/qiniu/util/EtagV2.java @@ -0,0 +1,136 @@ +package com.qiniu.util; + +import java.io.*; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; + +public class EtagV2 { + private EtagV2() { + } + + public static String data(byte[] data, int offset, int length, long blockSize) { + try { + return stream(new ByteArrayInputStream(data, offset, length), length, blockSize); + } catch (IOException e) { + throw new AssertionError(e); + } + } + + public static String data(byte[] data, long blockSize) { + return data(data, 0, data.length, blockSize); + } + + public static String data(byte[] data, int offset, int length, long[] parts) { + try { + return stream(new ByteArrayInputStream(data, offset, length), length, parts); + } catch (IOException e) { + throw new AssertionError(e); + } + } + + public static String data(byte[] data, long[] parts) { + return data(data, 0, data.length, parts); + } + + public static String file(String filePath, long blockSize) throws IOException { + return file(new File(filePath), blockSize); + } + + public static String file(String filePath, long[] parts) throws IOException { + return file(new File(filePath), parts); + } + + public static String file(File file, long blockSize) throws IOException { + FileInputStream fi = null; + try { + fi = new FileInputStream(file); + return stream(fi, file.length(), blockSize); + } finally { + if (fi != null) { + try { + fi.close(); + } catch (Throwable t) { + } + } + } + } + + public static String file(File file, long[] parts) throws IOException { + FileInputStream fi = null; + try { + fi = new FileInputStream(file); + return stream(fi, file.length(), parts); + } finally { + if (fi != null) { + try { + fi.close(); + } catch (Throwable t) { + } + } + } + } + + + public static String stream(InputStream in, long len, long blockSize) throws IOException { + if (blockSize == 1024 * 1024 * 4 || (len <= blockSize && len <= 1024 * 1024 * 4)) { + return Etag.stream(in, len); + } + int size = (int) ((len + blockSize - 1) / blockSize); + long[] parts = new long[size]; + Arrays.fill(parts, 0, size - 1, blockSize); + parts[size - 1] = len % blockSize; + + // is4MBParts(parts) include (len <= blockSize && len <= 1024 * 1024 * 4) + // means only one block, and it's size <= 4M, + // no need check + // if (is4MBParts(parts)) {return Etag.stream(in, len);} + + return etagV2NoCheck(in, len, parts); + } + + public static String stream(InputStream in, long len, long[] parts) throws IOException { + if (is4MBParts(parts)) { + return Etag.stream(in, len); + } + long partSize = 0; + for (long part : parts) { + partSize += part; + } + if (len != partSize) { + throw new IOException("etag calc failed: size not equal with part size"); + } + return etagV2NoCheck(in, len, parts); + } + + private static boolean is4MBParts(long[] parts) { + int idx = 0; + int last = parts.length - 1; + for (long part : parts) { + if (idx != last && part != 1024 * 1024 * 4 || part > 1024 * 1024 * 4) { + return false; + } + idx += 1; + } + return true; + } + + private static String etagV2NoCheck(InputStream in, long len, long[] parts) throws IOException { + MessageDigest sha1; + try { + sha1 = MessageDigest.getInstance("sha-1"); + } catch (NoSuchAlgorithmException e) { + throw new AssertionError(e); + } + for (long part : parts) { + String partEtag = Etag.stream(in, part); + byte[] bytes = UrlSafeBase64.decode(partEtag); + sha1.update(bytes, 1, bytes.length - 1); + } + byte[] digest = sha1.digest(); + byte[] ret = new byte[digest.length + 1]; + ret[0] = (byte) 0x9e; + System.arraycopy(digest, 0, ret, 1, digest.length); + return UrlSafeBase64.encodeToString(ret); + } +} diff --git a/src/test/java/com/qiniu/storage/FixBlockUploaderWithRecorderTest.java b/src/test/java/com/qiniu/storage/FixBlockUploaderWithRecorderTest.java index af3a9601c..149a1d98f 100644 --- a/src/test/java/com/qiniu/storage/FixBlockUploaderWithRecorderTest.java +++ b/src/test/java/com/qiniu/storage/FixBlockUploaderWithRecorderTest.java @@ -5,14 +5,13 @@ import com.qiniu.http.Client; import com.qiniu.http.Response; import com.qiniu.storage.persistent.FileRecorder; -import com.qiniu.util.Etag; +import com.qiniu.util.EtagV2; import com.qiniu.util.StringMap; import com.qiniu.util.UrlSafeBase64; import org.junit.Before; import org.junit.Test; import test.com.qiniu.TempFile; import test.com.qiniu.TestConfig; -import test.com.qiniu.util.EtagTest; import java.io.File; import java.io.IOException; @@ -23,7 +22,7 @@ import static org.junit.Assert.assertEquals; public class FixBlockUploaderWithRecorderTest { - int blockSize = 1024 * 1025; + int blockSize = 1024 * 1025 * 4; Configuration config; Client client; FixBlockUploader up; @@ -37,17 +36,7 @@ public void init() { } private void init2(boolean useHttpsDomains) { - Region rhttps = new Region.Builder(Region.region0()) - .accUpHost("up-dev.qiniu.io") - .srcUpHost("up-dev.qiniu.io") - .rsHost("rs-dev.qiniu.io") - .rsfHost("rsf-dev.qiniu.io").build(); - Region rhttp = new Region.Builder(Region.region0()) - .accUpHost("up.dev-kodo.dev.qiniu.io") - .srcUpHost("up.dev-kodo.dev.qiniu.io") - .rsHost("rs.dev-kodo.dev.qiniu.io") - .rsfHost("rsf.dev-kodo.dev.qiniu.io").build(); - config = new Configuration(useHttpsDomains ? rhttps : rhttp); + config = new Configuration(); config.useHttpsDomains = useHttpsDomains; client = new Client(config); @@ -61,8 +50,6 @@ private void init2(boolean useHttpsDomains) { up = new FixBlockUploader(blockSize, config, client, recorder); bm = new BucketManager(TestConfig.testAuth, config); bucket = TestConfig.testBucket_as0; - - bucket = "publicbucket_z0"; } @Test @@ -111,7 +98,8 @@ public void breakThenUpload(final ExecutorService pool1, final ExecutorService p final File f = TempFile.createFileOld(size); final FixBlockUploader.FileBlockData fbd = new FixBlockUploader.FileBlockData(blockSize, f); System.out.println(f.getAbsolutePath()); - final String etag = EtagTest.etagV2(f, blockSize); + final String etag = EtagV2.file(f, blockSize); + System.out.println("EtagV2: " + etag); final String returnBody = "{\"key\":\"$(key)\",\"hash\":\"$(etag)\",\"fsize\":\"$(fsize)\"" + ",\"fname\":\"$(fname)\",\"mimeType\":\"$(mimeType)\"}"; diff --git a/src/test/java/test/com/qiniu/CdnTest.java b/src/test/java/test/com/qiniu/CdnTest.java index 6dbd4bd83..43fa6a556 100644 --- a/src/test/java/test/com/qiniu/CdnTest.java +++ b/src/test/java/test/com/qiniu/CdnTest.java @@ -183,27 +183,26 @@ public void testCreateTimestampAntiLeechUrlSimple() { queryStringMap.put("test", "Test"); String encryptKey1 = "10992a8a688900b89ab9f58a6899cb8bb1b924ab"; String encryptKey2 = "64b89c989cb97cbb6a9b6c9a4ca93498b69974ab"; - long deadline1 = System.currentTimeMillis() / 1000 + 3600; - long deadline2 = deadline1; + long deadline1 = 1586690149; long deadline3 = 1551966091; // 2019-03-07 21:41:31 +0800 CST - String testUrl_z0_timeStamp_outdate = - "http://javasdk-timestamp.peterpy.cn/do_not_delete/1.png?" - + "sign=50d05540eea4ea8ab905b57006edef7a&t=5c811f8b"; + String u1 = "http://javasdk-timestamp.peterpy.cn/do_not_delete/1.png?test=Test&qiniu=%E4%B8%83%E7%89%9B&sign=f909641eb0539561dd8df28c15fa314b&t=5e92f865"; + String u2 = "http://javasdk-timestamp.peterpy.cn/do_not_delete/1.png?sign=9155cc1725106509920f5644f26f49b4&t=5e92f865"; + String u3 = "http://javasdk-timestamp.peterpy.cn/do_not_delete/1.png?sign=50d05540eea4ea8ab905b57006edef7a&t=5c811f8b"; + + // Fri May 16 18:22:38 2025 + String urlok = "http://javasdk-timestamp.peterpy.cn/do_not_delete/1.png?sign=fb528741bf617d33a0011e716e72e69a&t=682711ee"; try { URL url = new URL(TestConfig.testUrl_z0_timeStamp); - Assert.assertEquals(msg, 403, getResponse(url.toString()).statusCode); String signedUrl1 = CdnManager.createTimestampAntiLeechUrl(host, fileName, queryStringMap, encryptKey1, deadline1); - String signedUrl2 = CdnManager.createTimestampAntiLeechUrl(url, encryptKey2, deadline2); + String signedUrl2 = CdnManager.createTimestampAntiLeechUrl(url, encryptKey2, deadline1); String signedUrl3 = CdnManager.createTimestampAntiLeechUrl(host, fileName, null, encryptKey1, deadline3); - System.out.println(signedUrl1); - System.out.println(signedUrl2); - System.out.println(signedUrl3); - Assert.assertEquals(msg, 200, getResponse(signedUrl1).statusCode); - Assert.assertEquals(msg, 200, getResponse(signedUrl2).statusCode); - Assert.assertEquals(msg, 403, getResponse(signedUrl3).statusCode); - Assert.assertEquals(testUrl_z0_timeStamp_outdate, signedUrl3); + Assert.assertEquals(u1, signedUrl1); + Assert.assertEquals(u2, signedUrl2); + Assert.assertEquals(u3, signedUrl3); + Assert.assertEquals(msg, 403, getResponse(url.toString()).statusCode); + Assert.assertEquals(msg, 200, getResponse(urlok).statusCode); } catch (Exception ex) { ex.printStackTrace(); Assert.fail(msg); diff --git a/src/test/java/test/com/qiniu/storage/BucketTest.java b/src/test/java/test/com/qiniu/storage/BucketTest.java index f97840262..233969a7b 100644 --- a/src/test/java/test/com/qiniu/storage/BucketTest.java +++ b/src/test/java/test/com/qiniu/storage/BucketTest.java @@ -619,6 +619,7 @@ public void testBucketEvent() { ); System.out.println(rule.asQueryString()); response = bucketManager.putBucketEvent(bucket, rule); + System.out.println(response.getInfo()); Assert.assertEquals(200, response.statusCode); System.out.println(response.url()); System.out.println(response.reqId); @@ -634,14 +635,16 @@ public void testBucketEvent() { } // 重复追加Event(error:event prefix and suffix exists) - try { - rule.setName("b"); - System.out.println(rule.asQueryString()); - response = bucketManager.putBucketEvent(bucket, rule); - Assert.fail(); - } catch (QiniuException e) { - Assert.assertTrue(ResCode.find(e.code(), ResCode.getPossibleResCode(400))); - } + // 实际 portal 查看,添加成功了 // +// try { +// rule.setName("b"); +// System.out.println(rule.asQueryString()); +// response = bucketManager.putBucketEvent(bucket, rule); +// System.out.println("before Assert.fail():\n" + response.getInfo()); +// Assert.fail(); +// } catch (QiniuException e) { +// Assert.assertTrue(ResCode.find(e.code(), ResCode.getPossibleResCode(400))); +// } // 追加Event rule.setName("b"); @@ -673,7 +676,6 @@ public void testBucketEvent() { response = bucketManager.updateBucketEvent(bucket, rule); Assert.assertEquals(200, response.statusCode); - // clear clearBucketEvent(bucket); // 再获取Event @@ -768,6 +770,7 @@ public void testPutBucketAccessStyleMode() { String msg = " 空间删除了访问域名,若测试,请先在空间绑定域名, "; String[] buckets = new String[]{TestConfig.testBucket_z0, TestConfig.testBucket_na0}; String[] urls = new String[]{TestConfig.testUrl_z0, TestConfig.testUrl_na0}; + Random r = new Random(); for (int i = 0; i < buckets.length; i++) { String bucket = buckets[i]; String url = urls[i]; @@ -778,7 +781,7 @@ public void testPutBucketAccessStyleMode() { response = bucketManager.putBucketAccessStyleMode(bucket, AccessStyleMode.OPEN); Assert.assertEquals(200, response.statusCode); try { - client.get(url); + client.get(url + "?liubin=" + r.nextFloat()); Assert.fail(msg + "should be 401"); } catch (QiniuException e) { Assert.assertEquals(msg, 401, e.response.statusCode); @@ -788,7 +791,7 @@ public void testPutBucketAccessStyleMode() { response = bucketManager.putBucketAccessStyleMode(bucket, AccessStyleMode.CLOSE); System.out.println(response); Assert.assertEquals(msg, 200, response.statusCode); - response = client.get(url); + response = client.get(url + "?liubin=" + r.nextFloat()); Assert.assertEquals(msg, 200, response.statusCode); } catch (QiniuException e) { @@ -1112,7 +1115,8 @@ public void testBatchChangeType() { BatchStatus[] bs = r.jsonToObject(BatchStatus[].class); Assert.assertTrue("200 or 298", batchStatusCode.contains(bs[0].code)); } catch (QiniuException e) { - Assert.fail(e.response.toString()); + e.printStackTrace(); + Assert.fail(e.response + ""); } finally { try { bucketManager.delete(bucket, key); diff --git a/src/test/java/test/com/qiniu/storage/BucketTest2.java b/src/test/java/test/com/qiniu/storage/BucketTest2.java index b4dd30988..5b9d19cb3 100644 --- a/src/test/java/test/com/qiniu/storage/BucketTest2.java +++ b/src/test/java/test/com/qiniu/storage/BucketTest2.java @@ -609,135 +609,7 @@ private void clearBucketLifeCycleRule(String bucket) throws QiniuException { } } - /** - * 测试事件通知 - */ - @Test - public void testBucketEvent() { - String[] buckets = new String[]{TestConfig.testBucket_z0}; - String[] keys = new String[]{TestConfig.testKey_z0, TestConfig.testKey_na0}; - for (int i = 0; i < buckets.length; i++) { - String bucket = buckets[i]; - String key = keys[i]; - Response response; - BucketEventRule rule; - BucketEventRule[] rules; - try { - // clear - clearBucketEvent(bucket); - - // 追加Event(invalid events) - try { - rule = new BucketEventRule("a", new String[]{}, new String[]{}); - System.out.println(rule.asQueryString()); - response = bucketManager.putBucketEvent(bucket, rule); - Assert.fail(); - } catch (QiniuException e) { - Assert.assertTrue(ResCode.find(e.code(), ResCode.getPossibleResCode(400))); - } - - // 追加Event(error:callbackURL must starts with http:// or https://) - try { - rule = new BucketEventRule("a", new String[]{"put", "mkfile"}, new String[]{}); - System.out.println(rule.asQueryString()); - response = bucketManager.putBucketEvent(bucket, rule); - Assert.fail(); - } catch (QiniuException e) { - Assert.assertTrue(ResCode.find(e.code(), ResCode.getPossibleResCode(400))); - } - - // 追加Event - rule = new BucketEventRule( - "a", - new String[]{"put", "mkfile", "delete", "copy", "move", "append", "disable", - "enable", "deleteMarkerCreate"}, - new String[]{"https://requestbin.fullcontact.com/1dsqext1?inspect", - "https://requestbin.fullcontact.com/160bunp1?inspect"} - ); - System.out.println(rule.asQueryString()); - response = bucketManager.putBucketEvent(bucket, rule); - Assert.assertEquals(200, response.statusCode); - System.out.println(response.url()); - System.out.println(response.reqId); - - // 重复追加Event(error:event name exists) - try { - rule.setName("a"); - System.out.println(rule.asQueryString()); - response = bucketManager.putBucketEvent(bucket, rule); - Assert.fail(); - } catch (QiniuException e) { - Assert.assertTrue(ResCode.find(e.code(), ResCode.getPossibleResCode(400))); - } - - // 重复追加Event(error:event prefix and suffix exists) - try { - rule.setName("b"); - System.out.println(rule.asQueryString()); - response = bucketManager.putBucketEvent(bucket, rule); - Assert.fail(); - } catch (QiniuException e) { - Assert.assertTrue(ResCode.find(e.code(), ResCode.getPossibleResCode(400))); - } - - // 追加Event - rule.setName("b"); - rule.setPrefix(key); - System.out.println(rule.asQueryString()); - response = bucketManager.putBucketEvent(bucket, rule); - Assert.assertEquals(200, response.statusCode); - - // 触发时间,回调成功与否 不检测 - response = bucketManager.copy(bucket, key, bucket, key + "CopyByEvent", true); - Assert.assertEquals(200, response.statusCode); - - // 更新Event(error:event name not found) - try { - rule.setName("c"); - rule.setPrefix("c"); - System.out.println(rule.asQueryString()); - response = bucketManager.updateBucketEvent(bucket, rule); - Assert.fail(); - } catch (QiniuException e) { - Assert.assertTrue(ResCode.find(e.code(), ResCode.getPossibleResCode(400))); - } - // 更新Event - rule.setName("b"); - rule.setPrefix("c"); - rule.setEvents(new String[]{"disable", "enable", "deleteMarkerCreate"}); - System.out.println(rule.asQueryString()); - response = bucketManager.updateBucketEvent(bucket, rule); - Assert.assertEquals(200, response.statusCode); - - // clear - clearBucketEvent(bucket); - - // 再获取Event - rules = bucketManager.getBucketEvents(bucket); - Assert.assertEquals(0, rules.length); - - } catch (QiniuException e) { - Assert.fail("" + e.response); - } - } - } - - private void clearBucketEvent(String bucket) throws QiniuException { - // 获取Event - BucketEventRule[] rules = bucketManager.getBucketEvents(bucket); - for (BucketEventRule r : rules) { - System.out.println("name=" + r.getName()); - System.out.println("prefix=" + r.getPrefix()); - System.out.println("suffix=" + r.getSuffix()); - System.out.println("event=" + Arrays.asList(r.getEvents()).toString()); - System.out.println("callbackUrls=" + Arrays.asList(r.getCallbackUrls()).toString()); - } - // 删除Event - for (BucketEventRule r : rules) { - bucketManager.deleteBucketEvent(bucket, r.getName()); - } - } /** * 测试跨域规则 diff --git a/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java b/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java index 079312977..f9cea6d35 100644 --- a/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java +++ b/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java @@ -8,7 +8,7 @@ import com.qiniu.storage.Configuration; import com.qiniu.storage.FixBlockUploader; import com.qiniu.storage.Region; -import com.qiniu.util.Etag; +import com.qiniu.util.EtagV2; import com.qiniu.util.Md5; import com.qiniu.util.StringMap; import org.junit.Assert; @@ -16,7 +16,6 @@ import org.junit.Test; import test.com.qiniu.TempFile; import test.com.qiniu.TestConfig; -import test.com.qiniu.util.EtagTest; import java.io.File; import java.io.FileInputStream; @@ -40,23 +39,11 @@ public void init() { } private void init2(boolean useHttpsDomains) { - Region r = useHttpsDomains - ? new Region.Builder(Region.region0()) - .accUpHost("up-dev.qiniu.io") - .srcUpHost("up-dev.qiniu.io") - .rsHost("rs-dev.qiniu.io") - .rsfHost("rsf-dev.qiniu.io").build() - : new Region.Builder(Region.region0()) - .accUpHost("up.dev-kodo.dev.qiniu.io") - .srcUpHost("up.dev-kodo.dev.qiniu.io") - .rsHost("rs.dev-kodo.dev.qiniu.io") - .rsfHost("rsf.dev-kodo.dev.qiniu.io").build(); - config = new Configuration(r); + config = new Configuration(); config.useHttpsDomains = useHttpsDomains; client = new Client(config); up = new FixBlockUploader(blockSize, config, client, null); bucket = TestConfig.testBucket_z0; - bucket = "publicbucket_z0"; bm = new BucketManager(TestConfig.testAuth, config); } @@ -80,6 +67,12 @@ public void test1K() throws IOException { template(1, false, true); } + @Test + public void test4M() throws IOException { + template(1024 * 4, false, true); + template(blockSize/1024, false, true); + } + @Test public void test3MK() throws IOException { template(1024 * 3, false, true); @@ -119,8 +112,6 @@ public void test7M() throws IOException { template(1024 * 7, true, false); } - - @Test public void test12M1K() throws IOException { template(1024 * 12 + 1024, false, false); @@ -162,9 +153,10 @@ private void template(boolean isStream, int size, boolean https, boolean fixFile f = TempFile.createFileOld(size); } System.out.println(f.getAbsolutePath()); - final String etag = EtagTest.etagV2(f, blockSize); + final String etag = EtagV2.file(f, blockSize); final String md5 = Md5.md5(f); System.out.println("Etag(f): " + etag); + System.out.println("md5(f): " + md5); final String returnBody = "{\"key\":\"$(key)\",\"hash\":\"$(etag)\",\"fsize\":\"$(fsize)\"" + ",\"fname\":\"$(fname)\",\"mimeType\":\"$(mimeType)\"}"; @@ -217,7 +209,7 @@ private void template(boolean isStream, int size, boolean https, boolean fixFile @Test public void testEmptyKey() throws IOException { File f = TempFile.createFileOld(1); - String etag = EtagTest.etagV2(f); + String etag = EtagV2.file(f, blockSize); String token = TestConfig.testAuth.uploadToken(bucket, null); Response res = up.upload(f, token, ""); System.out.println(res.getInfo()); @@ -229,7 +221,7 @@ public void testEmptyKey() throws IOException { @Test public void testNullKey() throws IOException { File f = TempFile.createFile(2); - String etag = EtagTest.etagV2(f); + String etag = EtagV2.file(f, blockSize); String token = TestConfig.testAuth.uploadToken(bucket, null); Response res = up.upload(f, token, null); System.out.println(res.getInfo()); @@ -241,7 +233,7 @@ public void testNullKey() throws IOException { @Test public void testKey2() throws IOException { File f = TempFile.createFile(2); - String etag = EtagTest.etagV2(f); + String etag = EtagV2.file(f, blockSize); String token = TestConfig.testAuth.uploadToken(bucket, "err"); try { Response res = up.upload(f, token, null); @@ -255,7 +247,7 @@ public void testKey2() throws IOException { @Test public void testMeat() throws IOException { File f = TempFile.createFile(1); - String etag = EtagTest.etagV2(f); + String etag = EtagV2.file(f, blockSize); String returnBody = "{\"key\":\"$(key)\",\"hash\":\"$(etag)\",\"fsize\":\"$(fsize)\"" + ",\"fname\":\"$(x:biubiu)_$(fname)\",\"mimeType\":\"$(mimeType)\",\"biu2biu\":\"$(x:biu2biu)\"}"; diff --git a/src/test/java/test/com/qiniu/storage/FormUploadTest.java b/src/test/java/test/com/qiniu/storage/FormUploadTest.java index 9d9bd343f..01e1af44a 100644 --- a/src/test/java/test/com/qiniu/storage/FormUploadTest.java +++ b/src/test/java/test/com/qiniu/storage/FormUploadTest.java @@ -4,6 +4,7 @@ import com.qiniu.common.Zone; import com.qiniu.http.Response; import com.qiniu.storage.Configuration; +import com.qiniu.storage.Region; import com.qiniu.storage.UpCompletionHandler; import com.qiniu.storage.UploadManager; import com.qiniu.util.StringMap; @@ -201,8 +202,9 @@ public void testFile() { try { uploadManager.put(f, expectKey, token, params, null, true); } catch (QiniuException e) { + e.printStackTrace(); TempFile.remove(f); - fail(e.response.toString()); + fail(e.response + ""); } TempFile.remove(f); } diff --git a/src/test/java/test/com/qiniu/util/EtagTest.java b/src/test/java/test/com/qiniu/util/EtagTest.java index bf892ca3d..d47123b41 100644 --- a/src/test/java/test/com/qiniu/util/EtagTest.java +++ b/src/test/java/test/com/qiniu/util/EtagTest.java @@ -53,104 +53,4 @@ public void testFile() throws IOException { TempFile.remove(f); } - public static String etagV2(File file) throws IOException { - return etagV2(file, 1024 * 1024 * 4); - } - - public static String etagV2(File file, long blockSize) throws IOException { - FileInputStream fi = null; - try { - fi = new FileInputStream(file); - return etagV2(fi, file.length(), blockSize); - } finally { - if (fi != null) { - try { - fi.close(); - } catch (Throwable t) { - } - } - } - } - - public static String etagV2(File file, long[] parts) throws IOException { - FileInputStream fi = null; - try { - fi = new FileInputStream(file); - return etagV2(fi, file.length(), parts); - } finally { - if (fi != null) { - try { - fi.close(); - } catch (Throwable t) { - } - } - } - } - - public static String etagV2(InputStream in, long len) throws IOException { - return etagV2(in, len, 1024 * 1024 * 4); - } - - public static String etagV2(InputStream in, long len, long blockSize) throws IOException { - if (blockSize == 1024 * 1024 * 4 || (len <= blockSize && len <= 1024 * 1024 * 4)) { - return Etag.stream(in, len); - } - int size = (int) ((len + blockSize - 1) / blockSize); - long[] parts = new long[size]; - Arrays.fill(parts, blockSize); - parts[size - 1] = len % blockSize; - - // is4MBParts(parts) include (len <= blockSize && len <= 1024 * 1024 * 4) - // means only one block, and it's size <= 4M - - return etagV2NoCheck(in, len, parts); - } - - public static String etagV2(InputStream in, long len, long[] parts) throws IOException { - if (is4MBParts(parts)) { - return Etag.stream(in, len); - } - long partSize = 0; - for (long part : parts) { - partSize += part; - } - if (len != partSize) { - throw new IOException("etag calc failed: size not equal with part size"); - } - return etagV2NoCheck(in, len, parts); - } - - private static boolean is4MBParts(long[] parts) { - if (parts.length == 0) { - return true; - } - int idx = 0; - int last = parts.length - 1; - for (long part : parts) { - if (idx != last && part != 1024 * 1024 * 4 || part > 1024 * 1024 * 4) { - return false; - } - idx += 1; - } - return true; - } - - private static String etagV2NoCheck(InputStream in, long len, long[] parts) throws IOException { - MessageDigest sha1; - try { - sha1 = MessageDigest.getInstance("sha-1"); - } catch (NoSuchAlgorithmException e) { - throw new AssertionError(e); - } - for (long part : parts) { - String partEtag = Etag.stream(in, part); - byte[] bytes = UrlSafeBase64.decode(partEtag); - sha1.update(bytes, 1, bytes.length - 1); - } - byte[] digest = sha1.digest(); - byte[] ret = new byte[digest.length + 1]; - ret[0] = (byte) 0x9e; - System.arraycopy(digest, 0, ret, 1, digest.length); - return UrlSafeBase64.encodeToString(ret); - } } From b2cb2f97a4f0f8cfd1fc0f24928510b16ece7762 Mon Sep 17 00:00:00 2001 From: Sxci Date: Sat, 9 May 2020 12:01:19 +0800 Subject: [PATCH 07/12] update durction --- src/main/java/com/qiniu/http/Client.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/qiniu/http/Client.java b/src/main/java/com/qiniu/http/Client.java index 535ea0823..ee47f3390 100755 --- a/src/main/java/com/qiniu/http/Client.java +++ b/src/main/java/com/qiniu/http/Client.java @@ -339,7 +339,6 @@ public void accept(String key, Object value) { long start = System.currentTimeMillis(); okhttp3.Response res = null; Response r; - double duration = (System.currentTimeMillis() - start) / 1000.0; IpTag tag = new IpTag(); try { res = httpClient.newCall(requestBuilder.tag(tag).build()).execute(); @@ -347,6 +346,7 @@ public void accept(String key, Object value) { throw new QiniuException(e); } + double duration = (System.currentTimeMillis() - start) / 1000.0; r = Response.create(res, tag.ip, duration); if (r.statusCode >= 300) { throw new QiniuException(r); From 7337ca532a69d9a75c4fb2f0e1ab08290cce16d9 Mon Sep 17 00:00:00 2001 From: Sxci Date: Mon, 11 May 2020 13:43:14 +0800 Subject: [PATCH 08/12] =?UTF-8?q?=E5=AE=A2=E6=88=B7=E8=A6=81=E6=B1=82?= =?UTF-8?q?=E5=8D=95=E7=8B=AC=E5=BC=84=E5=87=BA=E5=88=86=E7=89=87=E4=B8=8A?= =?UTF-8?q?=E4=BC=A0=E7=9A=84=E5=9F=BA=E6=9C=AC=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/com/qiniu/http/Client.java | 2 +- .../com/qiniu/storage/FixBlockUploader.java | 267 +++++------------ .../com/qiniu/storage/MultipartUpload.java | 280 ++++++++++++++++++ .../qiniu/storage/FixBlockUploaderTest.java | 46 ++- 4 files changed, 389 insertions(+), 206 deletions(-) create mode 100644 src/main/java/com/qiniu/storage/MultipartUpload.java diff --git a/src/main/java/com/qiniu/http/Client.java b/src/main/java/com/qiniu/http/Client.java index ee47f3390..0fb39fa19 100755 --- a/src/main/java/com/qiniu/http/Client.java +++ b/src/main/java/com/qiniu/http/Client.java @@ -342,7 +342,7 @@ public void accept(String key, Object value) { IpTag tag = new IpTag(); try { res = httpClient.newCall(requestBuilder.tag(tag).build()).execute(); - } catch (IOException e) { + } catch (Exception e) { throw new QiniuException(e); } diff --git a/src/main/java/com/qiniu/storage/FixBlockUploader.java b/src/main/java/com/qiniu/storage/FixBlockUploader.java index d14468ac6..d18de2f8a 100644 --- a/src/main/java/com/qiniu/storage/FixBlockUploader.java +++ b/src/main/java/com/qiniu/storage/FixBlockUploader.java @@ -5,6 +5,9 @@ import com.qiniu.http.Client; import com.qiniu.http.Response; import com.qiniu.util.*; +import com.qiniu.storage.MultipartUpload.EtagIdx; +import com.qiniu.storage.MultipartUpload.OptionsMeta; +import com.qiniu.storage.MultipartUpload.InitRet; import java.io.File; import java.io.IOException; @@ -18,17 +21,27 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +/** + * 分片上传实现 + * https://developer.qiniu.com/kodo/api/6364/multipartupload-interface + * + * 若上传到同区域,如全上传到 华东存储,则可只使用一个实例; + * 若上传到不同区域,则每个区域最好单独使用一个示例。一个实例多区域并发上传可能导致内部频繁报错,上传效率低; + */ public class FixBlockUploader { private final int blockSize; - private final ConfigHelper configHelper; - private final Client client; private final Recorder recorder; + private final MultipartUpload uploader; + private final int retryMax; private String host = null; /** + * 若上传到同区域,如全上传到 华东存储,则可只使用一个实例; + * 若上传到不同区域,则每个区域最好单独使用一个示例。一个实例多区域并发上传可能导致内部频繁报错,上传效率低; + * * @param blockSize block size, eg: 4 * 1024 * 1024 * @param configuration Nullable, if null, then create a new one. * @param client Nullable, if null, then create a new one with configuration. @@ -45,8 +58,7 @@ public FixBlockUploader(int blockSize, Configuration configuration, Client clien if (client == null) { client = new Client(configuration); } - this.configHelper = new ConfigHelper(configuration); - this.client = client; + this.uploader = new MultipartUpload(configuration, client); this.blockSize = blockSize; this.recorder = recorder; this.retryMax = configuration.retryMax; @@ -105,32 +117,23 @@ Response upload(BlockData blockData, Token token, String key, OptionsMeta params ExecutorService pool, int maxRunningBlock) throws QiniuException { try { String bucket = parseBucket(token.getUpToken()); - /* - 上传到七牛存储保存的文件名, 需要进行UrlSafeBase64编码。 - 注意: - 当 key 为空 "" 时表示空的文件名,正常进行 url_safe_base64 编码; - 当 key 为未进行 UrlSafeBase64 编码的字符 ~ 的时候,表示未设置文件名, - 具体行为如分片上传v1: 使用文件的 hash 作为文件名, 如果设置了saveKey则使用saveKey的规则进行文件命名 - */ - String base64Key = key != null ? UrlSafeBase64.encodeToString(key) : "~"; String recordFileKey = (recorder == null) ? "NULL" - : recorder.recorderKeyGenerate(bucket, base64Key, blockData.getContentUUID(), + : recorder.recorderKeyGenerate(bucket, MultipartUpload.genKey(key), blockData.getContentUUID(), this.blockSize + "*:|>?^ \b" + this.getClass().getName()); // must before any http request // - if (host == null) { - host = configHelper.upHost(token.getUpToken()); - } + uploader.initUpHost(token.getUpToken()); + UploadRecordHelper recordHelper = new UploadRecordHelper(recorder, recordFileKey, blockData.repeatable()); // 1. initParts - Record record = initUpload(blockData, recordHelper, bucket, base64Key, token); + Record record = initUpload(blockData, recordHelper, bucket, key, token); boolean repeatable = recorder != null && blockData.repeatable(); Response res; try { // 2. uploadPart - upBlock(blockData, token, bucket, base64Key, repeatable, record, pool, maxRunningBlock); + upBlock(blockData, token, bucket, key, repeatable, record, pool, maxRunningBlock); // 3. completeParts - res = makeFile(bucket, base64Key, token, record.uploadId, record.etagIdxes, + res = makeFile(bucket, key, token, record.uploadId, record.etagIdxes, blockData.getFileName(), params); } catch (QiniuException e) { // if everything is ok, do not need to sync record // @@ -147,7 +150,7 @@ Response upload(BlockData blockData, Token token, String key, OptionsMeta params } Record initUpload(BlockData blockData, UploadRecordHelper recordHelper, - String bucket, String base64Key, Token token) throws QiniuException { + String bucket, String key, Token token) throws QiniuException { Record record = null; if (blockData.repeatable()) { record = recordHelper.reloadRecord(); @@ -158,7 +161,7 @@ record = null; } if (record == null || record.uploadId == null) { - InitRet ret = init(bucket, base64Key, token.getUpToken()); + InitRet ret = init(bucket, key, token); List etagIdxes = new ArrayList<>(); record = initRecord(ret, etagIdxes); @@ -166,16 +169,11 @@ record = initRecord(ret, etagIdxes); return record; } - InitRet init(String bucket, String base64Key, String upToken) throws QiniuException { - String url = host + "/buckets/" + bucket + "/objects/" + base64Key + "/uploads"; - byte[] data = new byte[0]; - StringMap headers = new StringMap().put("Authorization", "UpToken " + upToken); - String contentType = ""; - + InitRet init(String bucket, String key, Token token) throws QiniuException { Response res = null; try { // 1 - res = client.post(url, data, headers, contentType); + res = uploader.initiateMultipartUpload(bucket, key, token.getUpToken()); } catch (QiniuException e) { if (res == null && e.response != null) { res = e.response; @@ -187,11 +185,11 @@ InitRet init(String bucket, String base64Key, String upToken) throws QiniuExcept // 重试一次,初始不计入重试次数 // if (res == null || res.needRetry()) { if (res == null || res.needSwitchServer()) { - changeHost(upToken, host); + uploader.changeHost(token.getUpToken(), host); } try { // 2 - res = client.post(url, data, headers, contentType); + res = uploader.initiateMultipartUpload(bucket, key, token.getUpToken()); } catch (QiniuException e) { if (res == null && e.response != null) { res = e.response; @@ -202,10 +200,10 @@ InitRet init(String bucket, String base64Key, String upToken) throws QiniuExcept if (res == null || res.needRetry()) { if (res == null || res.needSwitchServer()) { - changeHost(upToken, host); + uploader.changeHost(token.getUpToken(), host); } // 3 - res = client.post(url, data, headers, contentType); + res = uploader.initiateMultipartUpload(bucket, key, token.getUpToken()); } } @@ -222,14 +220,14 @@ InitRet init(String bucket, String base64Key, String upToken) throws QiniuExcept } - private void upBlock(BlockData blockData, Token token, String bucket, String base64Key, boolean repeatable, + private void upBlock(BlockData blockData, Token token, String bucket, String key, boolean repeatable, Record record, ExecutorService pool, int maxRunningBlock) throws QiniuException { boolean useParallel = useParallel(pool, blockData, record); if (!useParallel) { - seqUpload(blockData, token, bucket, base64Key, record); + seqUpload(blockData, token, bucket, key, record); } else { - parallelUpload(blockData, token, bucket, base64Key, record, repeatable, pool, maxRunningBlock); + parallelUpload(blockData, token, bucket, key, record, repeatable, pool, maxRunningBlock); } } @@ -238,14 +236,14 @@ private boolean useParallel(ExecutorService pool, BlockData blockData, Record re } private void seqUpload(BlockData blockData, Token token, String bucket, - String base64Key, Record record) throws QiniuException { + String key, Record record) throws QiniuException { final String uploadId = record.uploadId; final List etagIdxes = record.etagIdxes; RetryCounter counter = new NormalRetryCounter(retryMax); while (blockData.hasNext()) { try { blockData.nextBlock(); - } catch (IOException e) { + } catch (Exception e) { throw new QiniuException(e, e.getMessage()); } DataWraper wrapper = blockData.getCurrentBlockData(); @@ -255,9 +253,9 @@ private void seqUpload(BlockData blockData, Token token, String bucket, EtagIdx etagIdx; try { - etagIdx = uploadBlock(bucket, base64Key, token, uploadId, + etagIdx = uploadBlock(bucket, key, token, uploadId, wrapper.getData(), wrapper.getSize(), wrapper.getIndex(), counter); - } catch (IOException e) { + } catch (Exception e) { throw new QiniuException(e, e.getMessage()); } etagIdxes.add(etagIdx); @@ -267,7 +265,7 @@ private void seqUpload(BlockData blockData, Token token, String bucket, } private void parallelUpload(BlockData blockData, final Token token, - final String bucket, final String base64Key, Record record, + final String bucket, final String key, Record record, boolean needRecord, ExecutorService pool, int maxRunningBlock) throws QiniuException { final String uploadId = record.uploadId; final List etagIdxes = record.etagIdxes; @@ -278,7 +276,7 @@ private void parallelUpload(BlockData blockData, final Token token, while (blockData.hasNext()) { try { blockData.nextBlock(); - } catch (IOException e) { + } catch (Exception e) { qiniuEx = new QiniuException(e, e.getMessage()); break; } @@ -290,7 +288,7 @@ private void parallelUpload(BlockData blockData, final Token token, Callable runner = new Callable() { @Override public EtagIdx call() throws Exception { - return uploadBlock(bucket, base64Key, token, uploadId, + return uploadBlock(bucket, key, token, uploadId, wrapper.getData(), wrapper.getSize(), wrapper.getIndex(), counter); } }; @@ -352,20 +350,13 @@ private void waitingEnough(int maxRunningBlock, List> futures) { } } - EtagIdx uploadBlock(String bucket, String base64Key, Token token, String uploadId, byte[] data, + EtagIdx uploadBlock(String bucket, String key, Token token, String uploadId, byte[] data, int dataLength, int partNum, RetryCounter counter) throws QiniuException { - String url = host + "/buckets/" + bucket + "/objects/" + base64Key + "/uploads/" + uploadId + "/" + partNum; - String md5 = Md5.md5(data, 0, dataLength); - StringMap headers = new StringMap(). - put("Content-MD5", md5). - put("Authorization", "UpToken " + token.getUpToken()); - - Response res = uploadBlockWithRetry(url, token, headers, data, dataLength, counter); + Response res = uploadBlockWithRetry(bucket, key, token, uploadId, data, dataLength, partNum, counter); try { - StringMap m = res.jsonToMap(); - // String qMd5 = m.get("md5").toString(); - String etag = m.get("etag").toString(); - if (/*md5.equals(qMd5) && */etag.length() > 10) { + MultipartUpload.UploadPartRet ret = res.jsonToObject(MultipartUpload.UploadPartRet.class); + String etag = ret.getEtag(); + if (etag.length() > 10) { return new EtagIdx(etag, partNum, dataLength); } } catch (Exception e) { @@ -374,18 +365,18 @@ EtagIdx uploadBlock(String bucket, String base64Key, Token token, String uploadI throw new QiniuException(res); } - Response uploadBlockWithRetry(String url, Token token, StringMap headers, - byte[] data, int dataLength, RetryCounter counter) + Response uploadBlockWithRetry(String bucket, String key, Token token, String uploadId, byte[] data, + int dataLength, int partNum, RetryCounter counter) throws QiniuException { // 在 最多重试次数 范围内, 每个块至多上传 3 次 // // 1 - Response res = uploadBlock1(url, data, dataLength, headers, true); + Response res = uploadBlock1(bucket, key, token, uploadId, data, dataLength, partNum, true); if (res.isOK()) { return res; } if (res.needSwitchServer()) { - changeHost(token.getUpToken(), host); + uploader.changeHost(token.getUpToken(), host); } if (!counter.inRange()) { @@ -395,14 +386,14 @@ Response uploadBlockWithRetry(String url, Token token, StringMap headers, if (res.needRetry()) { counter.retried(); // 2 - res = uploadBlock1(url, data, dataLength, headers, true); + res = uploadBlock1(bucket, key, token, uploadId, data, dataLength, partNum, true); if (res.isOK()) { return res; } if (res.needSwitchServer()) { - changeHost(token.getUpToken(), host); + uploader.changeHost(token.getUpToken(), host); } if (!counter.inRange()) { @@ -412,19 +403,18 @@ Response uploadBlockWithRetry(String url, Token token, StringMap headers, if (res.needRetry()) { counter.retried(); // 3 - res = uploadBlock1(url, data, dataLength, headers, false); + res = uploadBlock1(bucket, key, token, uploadId, data, dataLength, partNum, false); } } return res; } - Response uploadBlock1(String url, byte[] data, - int dataLength, StringMap headers, boolean ignoreError) throws QiniuException { + Response uploadBlock1(String bucket, String key, Token token, String uploadId, byte[] data, + int dataLength, int partNum, boolean ignoreError) throws QiniuException { // put PUT try { - Response res = client.put(url, data, 0, dataLength, headers, "application/octet-stream"); - return res; + return uploader.uploadPart(bucket, key, token.getUpToken(), uploadId, data, 0, dataLength, partNum); } catch (QiniuException e) { if (ignoreError) { if (e.response != null) { @@ -437,26 +427,20 @@ Response uploadBlock1(String url, byte[] data, } } - Response makeFile(String bucket, String base64Key, Token token, String uploadId, List etags, + Response makeFile(String bucket, String key, Token token, String uploadId, List etags, String fileName, OptionsMeta params) throws QiniuException { - String url = host + "/buckets/" + bucket + "/objects/" + base64Key + "/uploads/" + uploadId; - final StringMap headers = new StringMap().put("Authorization", "UpToken " + token.getUpToken()); - sortByPartNumberAsc(etags); - byte[] data = new MakefileBody(etags, fileName, params) - .json().getBytes(Charset.forName("UTF-8")); - // 1 - Response res = makeFile1(url, data, headers, true); + Response res = makeFile1(bucket, key, token, uploadId, etags, fileName, params, true); if (res.needRetry()) { // 2 - res = makeFile1(url, data, headers, true); + res = makeFile1(bucket, key, token, uploadId, etags, fileName, params, true); } if (res.needRetry()) { if (res.needSwitchServer()) { - changeHost(token.getUpToken(), host); + uploader.changeHost(token.getUpToken(), host); } // 3 - res = makeFile1(url, data, headers, false); + res = makeFile1(bucket, key, token, uploadId, etags, fileName, params, false); } // keep the same, with com.qiniu.http.Client#L337 if (res.statusCode >= 300) { @@ -465,10 +449,10 @@ Response makeFile(String bucket, String base64Key, Token token, String uploadId, return res; } - Response makeFile1(String url, byte[] data, StringMap headers, boolean ignoreError) throws QiniuException { + Response makeFile1(String bucket, String key, Token token, String uploadId, List etags, + String fileName, OptionsMeta params, boolean ignoreError) throws QiniuException { try { - Response res = client.post(url, data, headers, "application/json"); - return res; + return uploader.completeMultipartUpload(bucket, key, token.getUpToken(), uploadId, etags, fileName, params); } catch (QiniuException e) { if (ignoreError) { if (e.response != null) { @@ -481,15 +465,6 @@ Response makeFile1(String url, byte[] data, StringMap headers, boolean ignoreErr } } - private void changeHost(String upToken, String host) { - try { - this.host = configHelper.tryChangeUpHost(upToken, host); - } catch (Exception e) { - // ignore - // use the old up host // - } - } - private String parseBucket(String upToken) throws QiniuException { try { String part3 = upToken.split(":")[2]; @@ -502,75 +477,8 @@ private String parseBucket(String upToken) throws QiniuException { } } - static void sortByPartNumberAsc(List etags) { - Collections.sort(etags, new Comparator() { - @Override - public int compare(EtagIdx o1, EtagIdx o2) { - return o1.partNumber - o2.partNumber; // small enough and both greater than 0 // - } - }); - } - class MakefileBody { - List parts; - String fname; - String mimeType; - Map metadata; - Map customVars; - - MakefileBody(List etags, String fileName, OptionsMeta params) { - this.parts = etags; - this.fname = fileName; - if (params != null) { - this.mimeType = params.mimeType; - if (params.metadata != null && params.metadata.size() > 0) { - this.metadata = filterParam(params.metadata, "X-Qn-Meta-"); - } - if (params.customVars != null && params.customVars.size() > 0) { - this.customVars = filterParam(params.customVars, "x:"); - } - } - } - - private Map filterParam(StringMap param, final String keyPrefix) { - final Map ret = new HashMap<>(); - final String prefix = keyPrefix.toLowerCase(); - param.forEach(new StringMap.Consumer() { - @Override - public void accept(String key, Object value) { - if (key != null && value != null && !StringUtils.isNullOrEmpty(value.toString()) - && key.toLowerCase().startsWith(prefix)) { - ret.put(key, value); - } - } - }); - return ret; - } - - public String json() { - return new Gson().toJson(this); - } - } - - - class EtagIdx { - String etag; // mkfile - int partNumber; // mkfile - transient int size; // 本地使用,不写入 json 断点记录 // - - EtagIdx(String etag, int idx, int size) { - this.etag = etag; - this.partNumber = idx; - this.size = size; - } - - public String toString() { - return new Gson().toJson(this); - } - - } - /////////////////////////////////////// @@ -579,7 +487,7 @@ class Record { // second long expireAt; String uploadId; - long size; // sum(current putParts size) EtagIdx.size 未序列化,历史断点记录没法获取到已上传大小 // + long size; List etagIdxes; } @@ -589,8 +497,10 @@ Record initRecord(InitRet ret, List etagIdxes) { record.uploadId = ret.uploadId; //// 服务端 7 天内有效,设置 5 天 //// record.expireAt = ret.expireAt - 3600 * 24 * 2; - record.size = 0; // 本次上传到需要写断点记录时上传的总大小,EtagIdx.size 未序列化,历史断点记录没法获取到已上传大小 // record.etagIdxes = etagIdxes != null ? etagIdxes : new ArrayList(); + for (EtagIdx l : record.etagIdxes) { + record.size += l.size; + } return record; } @@ -631,7 +541,7 @@ public void delRecord() { public void syncRecord(Record record) { if (needRecord && recorder != null && record.etagIdxes.size() > 0) { - sortByPartNumberAsc(record.etagIdxes); + MultipartUpload.sortByPartNumberAsc(record.etagIdxes); recorder.set(recordFileKey, new Gson().toJson(record).getBytes(Charset.forName("UTF-8"))); } } @@ -662,12 +572,6 @@ public boolean isActiveRecord(Record record, BlockData blockData) { /////////////////////////////////////// - class InitRet { - String uploadId; - long expireAt; - } - - abstract static class BlockData { protected final int blockDataSize; @@ -980,39 +884,4 @@ public String getUpToken() { } } - public static class OptionsMeta { - String mimeType; - StringMap metadata; - StringMap customVars; - - public OptionsMeta setMimeType(String mimeType) { - this.mimeType = mimeType; - return this; - } - - /** - * @param key start with X-Qn-Meta- - * @param value not null or empty - */ - public OptionsMeta addMetadata(String key, String value) { - if (metadata == null) { - metadata = new StringMap(); - } - metadata.put(key, value); - return this; - } - - /** - * @param key start with x: - * @param value not null or empty - */ - public OptionsMeta addCustomVar(String key, String value) { - if (customVars == null) { - customVars = new StringMap(); - } - customVars.put(key, value); - return this; - } - } - } diff --git a/src/main/java/com/qiniu/storage/MultipartUpload.java b/src/main/java/com/qiniu/storage/MultipartUpload.java new file mode 100644 index 000000000..8fc10e449 --- /dev/null +++ b/src/main/java/com/qiniu/storage/MultipartUpload.java @@ -0,0 +1,280 @@ +package com.qiniu.storage; + +import com.google.gson.ExclusionStrategy; +import com.google.gson.FieldAttributes; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.qiniu.common.QiniuException; +import com.qiniu.http.Client; +import com.qiniu.http.Response; +import com.qiniu.util.Md5; +import com.qiniu.util.StringMap; +import com.qiniu.util.StringUtils; +import com.qiniu.util.UrlSafeBase64; + +import java.nio.charset.Charset; +import java.util.*; + +/** + * 分片上传基础接口实现,以及部分辅助代码 + * https://developer.qiniu.com/kodo/api/6364/multipartupload-interface + * + * 若上传到同区域,如全上传到 华东存储,则可只使用一个实例; + * 若上传到不同区域,则每个区域最好单独使用一个示例。一个实例多区域并发上传可能导致内部频繁报错,上传效率低; + */ +public class MultipartUpload { + private ConfigHelper configHelper; + private Client client; + private String host; + + /** + * 若上传到同区域,如全上传到 华东存储,则可只使用一个实例; + * 若上传到不同区域,则每个区域最好单独使用一个示例。一个实例多区域并发上传可能导致内部频繁报错,上传效率低; + * + * @param configuration Nullable, if null, then create a new one. + * @param client Nullable, if null, then create a new one with configuration. + */ + public MultipartUpload(Configuration configuration, Client client) { + if (configuration == null) { + configuration = new Configuration(); + } + if (client == null) { + client = new Client(configuration); + } + this.configHelper = new ConfigHelper(configuration); + this.client = client; + } + + public void initUpHost(String uploadToken) throws QiniuException { + if (host == null) { + host = configHelper.upHost(uploadToken); + } + } + + public void changeHost(String upToken, String host) { + try { + this.host = configHelper.tryChangeUpHost(upToken, host); + } catch (Exception e) { + // ignore + // use the old up host // + } + } + + public Response initiateMultipartUpload(String bucket, String key, String upToken) throws QiniuException { + initUpHost(upToken); + + String url = host + "/buckets/" + bucket + "/objects/" + genKey(key) + "/uploads"; + byte[] data = new byte[0]; + StringMap headers = new StringMap().put("Authorization", "UpToken " + upToken); + String contentType = ""; + return client.post(url, data, headers, contentType); + } + + public Response uploadPart(String bucket, String key, String upToken, String uploadId, byte[] data, + int dataOff, int dataLength, int partNum) throws QiniuException { + String url = host + "/buckets/" + bucket + "/objects/" + genKey(key) + "/uploads/" + uploadId + "/" + partNum; + String md5 = Md5.md5(data, dataOff, dataLength); + StringMap headers = new StringMap(). + put("Content-MD5", md5). + put("Authorization", "UpToken " + upToken); + return client.put(url, data, dataOff, dataLength, headers, "application/octet-stream"); + } + + public Response uploadPart(String bucket, String key, String upToken, String uploadId, byte[] data, + int partNum) throws QiniuException { + return uploadPart(bucket, key, upToken, uploadId, data, 0, data.length, partNum); + } + + public Response completeMultipartUpload(String bucket, String key, String upToken, + String uploadId, List etags, + String fileName, OptionsMeta params) throws QiniuException { + String url = host + "/buckets/" + bucket + "/objects/" + genKey(key) + "/uploads/" + uploadId; + final StringMap headers = new StringMap().put("Authorization", "UpToken " + upToken); + sortByPartNumberAsc(etags); + byte[] data = new MakefileBody(etags, fileName, params) + .json().getBytes(Charset.forName("UTF-8")); + return client.post(url, data, headers, "application/json"); + } + + public Response abortMultipartUpload(String bucket, String key, String upToken, + String uploadId) throws QiniuException { + String url = host + "/buckets/" + bucket + "/objects/" + genKey(key) + "/uploads/" + uploadId; + StringMap headers = new StringMap().put("Authorization", "UpToken " + upToken); + return client.delete(url, headers); + } + + /** + * @param key 指定的上传文件名。 null 对象是表示系统自动生成文件名(key);其它表示使用指定的值为文件名(key) + *

+ * 上传到七牛存储保存的文件名, http 上传时需要对其进行编码,此方法根据要求生成对应的编码。 + * 注意: + * 当 key 为空 "" 时表示空的文件名,正常进行 url_safe_base64 编码 ; + * 当 key 为未进行 UrlSafeBase64 编码的字符 ~ 的时候,表示未设置文件名:对应参数为 null 对象; + * 具体行为如分片上传v1: 使用文件的 hash 作为文件名, 如果设置了saveKey则使用saveKey的规则进行文件命名 + */ + static String genKey(String key) { + String base64Key = key != null ? UrlSafeBase64.encodeToString(key) : "~"; + return base64Key; + } + + /** + * 按照 partNumber 排序 + */ + public static void sortByPartNumberAsc(List etags) { + Collections.sort(etags, new Comparator() { + @Override + public int compare(EtagIdx o1, EtagIdx o2) { + return o1.partNumber - o2.partNumber; // small enough and both greater than 0 // + } + }); + } + + public static class InitRet { + long expireAt; + String uploadId; + + public InitRet(long expireAt, String uploadId) { + this.expireAt = expireAt; + this.uploadId = uploadId; + } + + public long getExpireAt() { + return expireAt; + } + + public String getUploadId() { + return uploadId; + } + } + + public static class UploadPartRet { + String etag; + String md5; + + public UploadPartRet(String etag, String md5) { + this.etag = etag; + this.md5 = md5; + } + + public String getEtag() { + return etag; + } + + public String getMd5() { + return md5; + } + } + + public static class EtagIdx { + String etag; // mkfile need + int partNumber; // mkfile need + int size; + + EtagIdx(String etag, int idx, int size) { + this.etag = etag; + this.partNumber = idx; + this.size = size; + } + + public String toString() { + return new Gson().toJson(this); + } + } + + + public static class OptionsMeta { + String mimeType; + StringMap metadata; + StringMap customVars; + + public OptionsMeta setMimeType(String mimeType) { + this.mimeType = mimeType; + return this; + } + + /** + * @param key start with X-Qn-Meta- + * @param value not null or empty + */ + public OptionsMeta addMetadata(String key, String value) { + if (metadata == null) { + metadata = new StringMap(); + } + metadata.put(key, value); + return this; + } + + /** + * @param key start with x: + * @param value not null or empty + */ + public OptionsMeta addCustomVar(String key, String value) { + if (customVars == null) { + customVars = new StringMap(); + } + customVars.put(key, value); + return this; + } + } + + + class MakefileBody { + List parts; + String fname; + String mimeType; + Map metadata; + Map customVars; + + MakefileBody(List etags, String fileName, OptionsMeta params) { + this.parts = etags; + this.fname = fileName; + if (params != null) { + this.mimeType = params.mimeType; + if (params.metadata != null && params.metadata.size() > 0) { + this.metadata = filterParam(params.metadata, "X-Qn-Meta-"); + } + if (params.customVars != null && params.customVars.size() > 0) { + this.customVars = filterParam(params.customVars, "x:"); + } + } + } + + private Map filterParam(StringMap param, final String keyPrefix) { + final Map ret = new HashMap<>(); + final String prefix = keyPrefix.toLowerCase(); + param.forEach(new StringMap.Consumer() { + @Override + public void accept(String key, Object value) { + if (key != null && value != null && !StringUtils.isNullOrEmpty(value.toString()) + && key.toLowerCase().startsWith(prefix)) { + ret.put(key, value); + } + } + }); + return ret; + } + + public String json() { + GsonBuilder builder = new GsonBuilder(); + Gson gson = builder.setExclusionStrategies(new CompleteMultipartUploadExclusionStrategy()).create(); + return gson.toJson(this); + } + } + + static class CompleteMultipartUploadExclusionStrategy implements ExclusionStrategy { + + /** + * 只需要 etag partNumber ,不需要 size + */ + @Override + public boolean shouldSkipField(FieldAttributes f) { + return EtagIdx.class == f.getDeclaringClass() && "size".equals(f.getName()); + } + + @Override + public boolean shouldSkipClass(Class clazz) { + return false; + } + } + +} diff --git a/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java b/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java index f9cea6d35..db40f216a 100644 --- a/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java +++ b/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java @@ -4,10 +4,7 @@ import com.qiniu.common.QiniuException; import com.qiniu.http.Client; import com.qiniu.http.Response; -import com.qiniu.storage.BucketManager; -import com.qiniu.storage.Configuration; -import com.qiniu.storage.FixBlockUploader; -import com.qiniu.storage.Region; +import com.qiniu.storage.*; import com.qiniu.util.EtagV2; import com.qiniu.util.Md5; import com.qiniu.util.StringMap; @@ -33,6 +30,8 @@ public class FixBlockUploaderTest { String bucket; BucketManager bm; + MultipartUpload multipartUpload; + @Before public void init() { init2(false); @@ -45,6 +44,8 @@ private void init2(boolean useHttpsDomains) { up = new FixBlockUploader(blockSize, config, client, null); bucket = TestConfig.testBucket_z0; bm = new BucketManager(TestConfig.testAuth, config); + + multipartUpload = new MultipartUpload(config, client); } @Test @@ -165,7 +166,7 @@ private void template(boolean isStream, int size, boolean https, boolean fixFile p.put("insertOnly", 1); } String token = TestConfig.testAuth.uploadToken(bucket, expectKey, 3600, p); - FixBlockUploader.OptionsMeta param = new FixBlockUploader.OptionsMeta(); + MultipartUpload.OptionsMeta param = new MultipartUpload.OptionsMeta(); param.addMetadata("X-Qn-Meta-liubin", "sb"). addMetadata("X-Qn-Meta-!Content-Type", "text/liubin"). addMetadata("X-Qn-Meta-Cache-Control", "public, max-age=1984"); @@ -254,7 +255,7 @@ public void testMeat() throws IOException { StringMap p = new StringMap().put("returnBody", returnBody); String key = "俩个中文试试1.txt"; String token = TestConfig.testAuth.uploadToken(bucket, key, 3600, p); - FixBlockUploader.OptionsMeta param = new FixBlockUploader.OptionsMeta(); + MultipartUpload.OptionsMeta param = new MultipartUpload.OptionsMeta(); String mimeType = "mimetype/hehe"; param.setMimeType(mimeType); param.addCustomVar("x:biubiu", "duDu/werfhue3"); @@ -286,4 +287,37 @@ class MyRet { public String mimeType; public String biu2biu; } + + @Test + public void testAbort() throws QiniuException { + String bucket = TestConfig.testBucket_z0; + String key = null; + String upToken = TestConfig.testAuth.uploadToken(bucket); // 默认 3600 秒内有效 + // 如果并发上传资源到不同的区域,则 每个区域需使用不同 multipartUpload // + multipartUpload.initUpHost(upToken); // 最好有这一步 // 初次上传、更换上传区域 均需要调用此方法 // + Response initRes = multipartUpload.initiateMultipartUpload(bucket, key, upToken); + MultipartUpload.InitRet initRet = initRes.jsonToObject(MultipartUpload.InitRet.class); + byte[] data = new byte[]{1,2,3,4}; + Response uploadPartRes = multipartUpload.uploadPart(bucket, key, upToken, initRet.getUploadId(), data, 1); + MultipartUpload.UploadPartRet uploadPartRet = uploadPartRes.jsonToObject(MultipartUpload.UploadPartRet.class); + Response abortRes = multipartUpload.abortMultipartUpload(bucket, key, upToken, initRet.getUploadId()); + Assert.assertTrue(abortRes.isOK()); + + abort(""); + abort("sTduhruwefjdhfgitvbor283Gsw.buys"); + } + + public void abort(String key) throws QiniuException { + String bucket = TestConfig.testBucket_z0; + String upToken = TestConfig.testAuth.uploadToken(bucket); // 默认 3600 秒内有效 + multipartUpload.initUpHost(upToken); // 最好有这一步 // + Response initRes = multipartUpload.initiateMultipartUpload(bucket, key, upToken); + MultipartUpload.InitRet initRet = initRes.jsonToObject(MultipartUpload.InitRet.class); + byte[] data = new byte[]{1,2,3,4}; + Response uploadPartRes = multipartUpload.uploadPart(bucket, key, upToken, initRet.getUploadId(), data, 1); + MultipartUpload.UploadPartRet uploadPartRet = uploadPartRes.jsonToObject(MultipartUpload.UploadPartRet.class); + Response abortRes = multipartUpload.abortMultipartUpload(bucket, key, upToken, initRet.getUploadId()); + Assert.assertTrue(abortRes.isOK()); + } + } From a26580c93941a29f3696e1ede850e6bbbf2d99d1 Mon Sep 17 00:00:00 2001 From: Sxci Date: Mon, 11 May 2020 14:01:41 +0800 Subject: [PATCH 09/12] import --- .../com/qiniu/storage/FixBlockUploader.java | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/qiniu/storage/FixBlockUploader.java b/src/main/java/com/qiniu/storage/FixBlockUploader.java index d18de2f8a..bceb4ffaa 100644 --- a/src/main/java/com/qiniu/storage/FixBlockUploader.java +++ b/src/main/java/com/qiniu/storage/FixBlockUploader.java @@ -6,8 +6,6 @@ import com.qiniu.http.Response; import com.qiniu.util.*; import com.qiniu.storage.MultipartUpload.EtagIdx; -import com.qiniu.storage.MultipartUpload.OptionsMeta; -import com.qiniu.storage.MultipartUpload.InitRet; import java.io.File; import java.io.IOException; @@ -74,7 +72,7 @@ public Response upload(final File file, final String token, String key, return upload(file, token, key, null, pool, 8); } - public Response upload(final File file, final String token, String key, OptionsMeta params, + public Response upload(final File file, final String token, String key, MultipartUpload.OptionsMeta params, ExecutorService pool, int maxRunningBlock) throws QiniuException { BlockData blockData; try { @@ -99,7 +97,7 @@ public Response upload(final InputStream is, long inputStreamLength, String file public Response upload(final InputStream is, long inputStreamLength, String fileName, - final String token, String key, OptionsMeta params, + final String token, String key, MultipartUpload.OptionsMeta params, ExecutorService pool, int maxRunningBlock) throws QiniuException { BlockData blockData; blockData = new InputStreamBlockData(this.blockSize, is, inputStreamLength, fileName); @@ -107,13 +105,13 @@ public Response upload(final InputStream is, long inputStreamLength, String file } - Response upload(BlockData blockData, String token, String key, OptionsMeta params, + Response upload(BlockData blockData, String token, String key, MultipartUpload.OptionsMeta params, ExecutorService pool, int maxRunningBlock) throws QiniuException { return upload(blockData, new StaticToken(token), key, params, pool, maxRunningBlock); } - Response upload(BlockData blockData, Token token, String key, OptionsMeta params, + Response upload(BlockData blockData, Token token, String key, MultipartUpload.OptionsMeta params, ExecutorService pool, int maxRunningBlock) throws QiniuException { try { String bucket = parseBucket(token.getUpToken()); @@ -161,7 +159,7 @@ record = null; } if (record == null || record.uploadId == null) { - InitRet ret = init(bucket, key, token); + MultipartUpload.InitRet ret = init(bucket, key, token); List etagIdxes = new ArrayList<>(); record = initRecord(ret, etagIdxes); @@ -169,7 +167,7 @@ record = initRecord(ret, etagIdxes); return record; } - InitRet init(String bucket, String key, Token token) throws QiniuException { + MultipartUpload.InitRet init(String bucket, String key, Token token) throws QiniuException { Response res = null; try { // 1 @@ -208,7 +206,7 @@ InitRet init(String bucket, String key, Token token) throws QiniuException { } try { - InitRet ret = res.jsonToObject(InitRet.class); + MultipartUpload.InitRet ret = res.jsonToObject(MultipartUpload.InitRet.class); if (ret != null && ret.uploadId != null && ret.uploadId.length() > 10 && ret.expireAt > 1000) { return ret; } @@ -428,7 +426,7 @@ Response uploadBlock1(String bucket, String key, Token token, String uploadId, b } Response makeFile(String bucket, String key, Token token, String uploadId, List etags, - String fileName, OptionsMeta params) throws QiniuException { + String fileName, MultipartUpload.OptionsMeta params) throws QiniuException { // 1 Response res = makeFile1(bucket, key, token, uploadId, etags, fileName, params, true); if (res.needRetry()) { @@ -450,7 +448,7 @@ Response makeFile(String bucket, String key, Token token, String uploadId, List< } Response makeFile1(String bucket, String key, Token token, String uploadId, List etags, - String fileName, OptionsMeta params, boolean ignoreError) throws QiniuException { + String fileName, MultipartUpload.OptionsMeta params, boolean ignoreError) throws QiniuException { try { return uploader.completeMultipartUpload(bucket, key, token.getUpToken(), uploadId, etags, fileName, params); } catch (QiniuException e) { @@ -492,7 +490,7 @@ class Record { } - Record initRecord(InitRet ret, List etagIdxes) { + Record initRecord(MultipartUpload.InitRet ret, List etagIdxes) { Record record = new Record(); record.uploadId = ret.uploadId; //// 服务端 7 天内有效,设置 5 天 //// From 8b4e38582b6e37c5a50080f313bd29e015175c9e Mon Sep 17 00:00:00 2001 From: Sxci Date: Mon, 11 May 2020 14:14:04 +0800 Subject: [PATCH 10/12] a --- src/main/java/com/qiniu/storage/MultipartUpload.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/qiniu/storage/MultipartUpload.java b/src/main/java/com/qiniu/storage/MultipartUpload.java index 8fc10e449..4b065cbea 100644 --- a/src/main/java/com/qiniu/storage/MultipartUpload.java +++ b/src/main/java/com/qiniu/storage/MultipartUpload.java @@ -56,7 +56,7 @@ public void changeHost(String upToken, String host) { this.host = configHelper.tryChangeUpHost(upToken, host); } catch (Exception e) { // ignore - // use the old up host // + // use the old up host } } From 09e59567d5838738e669589928da6f709863c720 Mon Sep 17 00:00:00 2001 From: Sxci Date: Mon, 1 Jun 2020 14:53:37 +0800 Subject: [PATCH 11/12] add listParts --- .../java/com/qiniu/storage/Configuration.java | 17 +++- .../com/qiniu/storage/MultipartUpload.java | 97 +++++++++++++++++-- 2 files changed, 106 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/qiniu/storage/Configuration.java b/src/main/java/com/qiniu/storage/Configuration.java index 056c38d89..f52f8e9d7 100644 --- a/src/main/java/com/qiniu/storage/Configuration.java +++ b/src/main/java/com/qiniu/storage/Configuration.java @@ -18,12 +18,13 @@ public final class Configuration implements Cloneable { public static String defaultRsHost = "rs.qiniu.com"; public static String defaultApiHost = "api.qiniu.com"; public static String defaultUcHost = "uc.qbox.me"; + /** - * 使用的Region + * 使用的 Region 设置 region 请调用 setRegion */ public Region region; /** - * 使用的Zone + * 使用的 Zone 。推荐使用 Region ,设置 region 请调用 setRegion */ @Deprecated public Zone zone; @@ -107,6 +108,18 @@ public Configuration(Zone zone) { configHelper = new ConfigHelper(this); } + public void setRegion(Region region) { + this.region = region; + configHelper = new ConfigHelper(this); + } + + @Deprecated + public void setZone(Zone zone) { + this.region = null; + this.zone = zone; + configHelper = new ConfigHelper(this); + } + public Configuration clone() { try { return (Configuration) super.clone(); diff --git a/src/main/java/com/qiniu/storage/MultipartUpload.java b/src/main/java/com/qiniu/storage/MultipartUpload.java index 4b065cbea..3f935a8f4 100644 --- a/src/main/java/com/qiniu/storage/MultipartUpload.java +++ b/src/main/java/com/qiniu/storage/MultipartUpload.java @@ -17,8 +17,8 @@ /** * 分片上传基础接口实现,以及部分辅助代码 - * https://developer.qiniu.com/kodo/api/6364/multipartupload-interface - * + * 接口参考 https://developer.qiniu.com/kodo/api/6364/multipartupload-interface + *

* 若上传到同区域,如全上传到 华东存储,则可只使用一个实例; * 若上传到不同区域,则每个区域最好单独使用一个示例。一个实例多区域并发上传可能导致内部频繁报错,上传效率低; */ @@ -85,6 +85,37 @@ public Response uploadPart(String bucket, String key, String upToken, String upl return uploadPart(bucket, key, upToken, uploadId, data, 0, data.length, partNum); } + public Response listParts(String bucket, String key, String upToken, String uploadId, int maxParts, String partNumberMarker) throws QiniuException { + String url = host + "/buckets/" + bucket + "/objects/" + genKey(key) + "/uploads/" + uploadId + "?max-parts=" + maxParts; + if (!StringUtils.isNullOrEmpty(partNumberMarker)) { + url += "&part-number-marker=" + partNumberMarker; + } + + final StringMap headers = new StringMap().put("Authorization", "UpToken " + upToken); + return client.get(url, headers); + } + + public List listPartsAll(String bucket, String key, String upToken, String uploadId) throws QiniuException { + return listPartsAll(bucket, key, upToken, uploadId, 1000); //默认值:1,000 ,最大值:1,000 + } + + public List listPartsAll(String bucket, String key, String upToken, String uploadId, int maxParts) throws QiniuException { + List parts = new ArrayList(); + String partNumberMarker = null; + while (true) { + Response res = listParts(bucket, key, upToken, uploadId, maxParts, partNumberMarker); + ListPartsRet ret = res.jsonToObject(ListPartsRet.class); + parts.addAll(ret.getParts()); + if ("0".equalsIgnoreCase(ret.getPartNumberMarker())) { + break; + } + partNumberMarker = ret.getPartNumberMarker(); + } + sortByPartNumberAsc(parts); + return parts; + } + + public Response completeMultipartUpload(String bucket, String key, String upToken, String uploadId, List etags, String fileName, OptionsMeta params) throws QiniuException { @@ -118,7 +149,7 @@ static String genKey(String key) { } /** - * 按照 partNumber 排序 + * 按照 partNumber 升序排序 */ public static void sortByPartNumberAsc(List etags) { Collections.sort(etags, new Comparator() { @@ -169,18 +200,71 @@ public static class EtagIdx { String etag; // mkfile need int partNumber; // mkfile need int size; + long putTime; - EtagIdx(String etag, int idx, int size) { + public EtagIdx(String etag, int idx, int size) { this.etag = etag; this.partNumber = idx; this.size = size; } + public EtagIdx(String etag, int idx, int size, long putTime) { + this.etag = etag; + this.partNumber = idx; + this.size = size; + this.putTime = putTime; + } + + public String getEtag() { + return etag; + } + + public int getPartNumber() { + return partNumber; + } + + public int getSize() { + return size; + } + + public long getPutTime() { + return putTime; + } + public String toString() { return new Gson().toJson(this); } } + public static class ListPartsRet { + String uploadId; + long expireAt; + String partNumberMarker; + List parts; + + public ListPartsRet(String uploadId, long expireAt, String partNumberMarker, List parts) { + this.uploadId = uploadId; + this.expireAt = expireAt; + this.partNumberMarker = partNumberMarker; + this.parts = parts; + } + + public String getUploadId() { + return uploadId; + } + + public long getExpireAt() { + return expireAt; + } + + public String getPartNumberMarker() { + return partNumberMarker; + } + + public List getParts() { + return parts; + } + } public static class OptionsMeta { String mimeType; @@ -264,11 +348,12 @@ public String json() { static class CompleteMultipartUploadExclusionStrategy implements ExclusionStrategy { /** - * 只需要 etag partNumber ,不需要 size + * 只需要 etag partNumber ,不需要 size、putTime */ @Override public boolean shouldSkipField(FieldAttributes f) { - return EtagIdx.class == f.getDeclaringClass() && "size".equals(f.getName()); + return EtagIdx.class == f.getDeclaringClass() && + ("size".equals(f.getName()) || "putTime".equals(f.getName())); } @Override From dbd7a7f0de07336fe8fa0200b735a53cd06e36bf Mon Sep 17 00:00:00 2001 From: Sxci Date: Mon, 1 Jun 2020 14:54:07 +0800 Subject: [PATCH 12/12] add test [ci skip] --- .../qiniu/storage/FixBlockUploaderTest.java | 56 +++++++++++++------ 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java b/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java index db40f216a..acd6c43ce 100644 --- a/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java +++ b/src/test/java/test/com/qiniu/storage/FixBlockUploaderTest.java @@ -5,6 +5,7 @@ import com.qiniu.http.Client; import com.qiniu.http.Response; import com.qiniu.storage.*; +import com.qiniu.util.Auth; import com.qiniu.util.EtagV2; import com.qiniu.util.Md5; import com.qiniu.util.StringMap; @@ -18,6 +19,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.util.Date; +import java.util.List; import static junit.framework.TestCase.assertTrue; import static org.junit.Assert.assertEquals; @@ -289,33 +291,53 @@ class MyRet { } @Test - public void testAbort() throws QiniuException { - String bucket = TestConfig.testBucket_z0; - String key = null; - String upToken = TestConfig.testAuth.uploadToken(bucket); // 默认 3600 秒内有效 - // 如果并发上传资源到不同的区域,则 每个区域需使用不同 multipartUpload // - multipartUpload.initUpHost(upToken); // 最好有这一步 // 初次上传、更换上传区域 均需要调用此方法 // - Response initRes = multipartUpload.initiateMultipartUpload(bucket, key, upToken); - MultipartUpload.InitRet initRet = initRes.jsonToObject(MultipartUpload.InitRet.class); - byte[] data = new byte[]{1,2,3,4}; - Response uploadPartRes = multipartUpload.uploadPart(bucket, key, upToken, initRet.getUploadId(), data, 1); - MultipartUpload.UploadPartRet uploadPartRet = uploadPartRes.jsonToObject(MultipartUpload.UploadPartRet.class); - Response abortRes = multipartUpload.abortMultipartUpload(bucket, key, upToken, initRet.getUploadId()); - Assert.assertTrue(abortRes.isOK()); - - abort(""); - abort("sTduhruwefjdhfgitvbor283Gsw.buys"); + public void listThenAbort() throws QiniuException { + listThenAbort(null); + listThenAbort(""); + listThenAbort("sTduhruwefjdhfgitvbor283Gsw.buys"); } - public void abort(String key) throws QiniuException { + + public void listThenAbort(String key) throws QiniuException { String bucket = TestConfig.testBucket_z0; String upToken = TestConfig.testAuth.uploadToken(bucket); // 默认 3600 秒内有效 + multipartUpload.initUpHost(upToken); // 最好有这一步 // Response initRes = multipartUpload.initiateMultipartUpload(bucket, key, upToken); MultipartUpload.InitRet initRet = initRes.jsonToObject(MultipartUpload.InitRet.class); + + // 实际上传中,除 最后一块外,其它块大小必须大于 1M,否则 completeMultipartUpload 会报错 // byte[] data = new byte[]{1,2,3,4}; Response uploadPartRes = multipartUpload.uploadPart(bucket, key, upToken, initRet.getUploadId(), data, 1); MultipartUpload.UploadPartRet uploadPartRet = uploadPartRes.jsonToObject(MultipartUpload.UploadPartRet.class); + + MultipartUpload.EtagIdx e1 = new MultipartUpload.EtagIdx(uploadPartRet.getEtag(), 1, data.length); + System.out.println("uploadPart: " + e1); + + data = new byte[]{1,2,3,4,56,7,8}; + uploadPartRes = multipartUpload.uploadPart(bucket, key, upToken, initRet.getUploadId(), data, 2); + uploadPartRet = uploadPartRes.jsonToObject(MultipartUpload.UploadPartRet.class); + + MultipartUpload.EtagIdx e2 = new MultipartUpload.EtagIdx(uploadPartRet.getEtag(), 2, data.length); + System.out.println("uploadPart: " + e2); + + List listRet = multipartUpload.listPartsAll(bucket, key, upToken, initRet.getUploadId(), 1); + System.out.println("listPartsAll: "+ listRet); + Assert.assertEquals(2, listRet.size()); + Assert.assertEquals(e1.getPartNumber(), listRet.get(0).getPartNumber()); + Assert.assertEquals(e1.getEtag(), listRet.get(0).getEtag()); + Assert.assertEquals(e2.getPartNumber(), listRet.get(1).getPartNumber()); + Assert.assertEquals(e2.getEtag(), listRet.get(1).getEtag()); + + + listRet = multipartUpload.listPartsAll(bucket, key, upToken, initRet.getUploadId()); + System.out.println("listPartsAll: "+ listRet); + Assert.assertEquals(2, listRet.size()); + Assert.assertEquals(e1.getPartNumber(), listRet.get(0).getPartNumber()); + Assert.assertEquals(e1.getEtag(), listRet.get(0).getEtag()); + Assert.assertEquals(e2.getPartNumber(), listRet.get(1).getPartNumber()); + Assert.assertEquals(e2.getEtag(), listRet.get(1).getEtag()); + Response abortRes = multipartUpload.abortMultipartUpload(bucket, key, upToken, initRet.getUploadId()); Assert.assertTrue(abortRes.isOK()); }