-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-18656: [ABFS] Adding Support for Paginated Delete for Large Directories in HNS Account #6409
Changes from 8 commits
4580d3b
e5605ac
4576cea
1b0e9a3
5567944
42010aa
db95738
197f3bc
1ab45fa
169e1d7
2b502e4
d7d9d06
f500632
a31d3ae
365aa51
47f6623
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -92,7 +92,7 @@ public class AbfsClient implements Closeable { | |
|
||
private final URL baseUrl; | ||
private final SharedKeyCredentials sharedKeyCredentials; | ||
private String xMsVersion = DECEMBER_2019_API_VERSION; | ||
private ApiVersion xMsVersion = ApiVersion.getCurrentVersion(); | ||
private final ExponentialRetryPolicy retryPolicy; | ||
private final String filesystem; | ||
private final AbfsConfiguration abfsConfiguration; | ||
|
@@ -139,7 +139,7 @@ private AbfsClient(final URL baseUrl, | |
|
||
if (encryptionContextProvider != null) { | ||
this.encryptionContextProvider = encryptionContextProvider; | ||
xMsVersion = APRIL_2021_API_VERSION; // will be default once server change deployed | ||
xMsVersion = ApiVersion.APR_10_2021; // will be default once server change deployed | ||
encryptionType = EncryptionType.ENCRYPTION_CONTEXT; | ||
} else if (abfsConfiguration.getEncodedClientProvidedEncryptionKey() != null) { | ||
clientProvidedEncryptionKey = | ||
|
@@ -233,9 +233,9 @@ AbfsThrottlingIntercept getIntercept() { | |
return intercept; | ||
} | ||
|
||
List<AbfsHttpHeader> createDefaultHeaders() { | ||
List<AbfsHttpHeader> createDefaultHeaders(ApiVersion xMsVersion) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Taken all |
||
final List<AbfsHttpHeader> requestHeaders = new ArrayList<AbfsHttpHeader>(); | ||
requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion)); | ||
requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion.toString())); | ||
requestHeaders.add(new AbfsHttpHeader(ACCEPT, APPLICATION_JSON | ||
+ COMMA + SINGLE_WHITE_SPACE + APPLICATION_OCTET_STREAM)); | ||
requestHeaders.add(new AbfsHttpHeader(ACCEPT_CHARSET, | ||
|
@@ -305,7 +305,7 @@ AbfsUriQueryBuilder createDefaultUriQueryBuilder() { | |
|
||
public AbfsRestOperation createFilesystem(TracingContext tracingContext) | ||
throws AzureBlobFileSystemException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(xMsVersion); | ||
|
||
final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder(); | ||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); | ||
|
@@ -320,7 +320,7 @@ public AbfsRestOperation createFilesystem(TracingContext tracingContext) | |
|
||
public AbfsRestOperation setFilesystemProperties(final String properties, | ||
TracingContext tracingContext) throws AzureBlobFileSystemException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(xMsVersion); | ||
// JDK7 does not support PATCH, so to work around the issue we will use | ||
// PUT and specify the real method in the X-Http-Method-Override header. | ||
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, | ||
|
@@ -345,7 +345,7 @@ public AbfsRestOperation setFilesystemProperties(final String properties, | |
public AbfsRestOperation listPath(final String relativePath, final boolean recursive, final int listMaxResults, | ||
final String continuation, TracingContext tracingContext) | ||
throws IOException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(xMsVersion); | ||
|
||
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); | ||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); | ||
|
@@ -367,7 +367,7 @@ public AbfsRestOperation listPath(final String relativePath, final boolean recur | |
} | ||
|
||
public AbfsRestOperation getFilesystemProperties(TracingContext tracingContext) throws AzureBlobFileSystemException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(xMsVersion); | ||
|
||
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); | ||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); | ||
|
@@ -383,7 +383,7 @@ public AbfsRestOperation getFilesystemProperties(TracingContext tracingContext) | |
} | ||
|
||
public AbfsRestOperation deleteFilesystem(TracingContext tracingContext) throws AzureBlobFileSystemException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(xMsVersion); | ||
|
||
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); | ||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); | ||
|
@@ -434,7 +434,7 @@ public AbfsRestOperation createPath(final String path, | |
final ContextEncryptionAdapter contextEncryptionAdapter, | ||
final TracingContext tracingContext) | ||
throws AzureBlobFileSystemException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(xMsVersion); | ||
if (isFile) { | ||
addEncryptionKeyRequestHeaders(path, requestHeaders, true, | ||
contextEncryptionAdapter, tracingContext); | ||
|
@@ -495,7 +495,7 @@ public AbfsRestOperation createPath(final String path, | |
} | ||
|
||
public AbfsRestOperation acquireLease(final String path, int duration, TracingContext tracingContext) throws AzureBlobFileSystemException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(xMsVersion); | ||
|
||
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, ACQUIRE_LEASE_ACTION)); | ||
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_DURATION, Integer.toString(duration))); | ||
|
@@ -515,7 +515,7 @@ public AbfsRestOperation acquireLease(final String path, int duration, TracingCo | |
|
||
public AbfsRestOperation renewLease(final String path, final String leaseId, | ||
TracingContext tracingContext) throws AzureBlobFileSystemException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(xMsVersion); | ||
|
||
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RENEW_LEASE_ACTION)); | ||
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); | ||
|
@@ -534,7 +534,7 @@ public AbfsRestOperation renewLease(final String path, final String leaseId, | |
|
||
public AbfsRestOperation releaseLease(final String path, | ||
final String leaseId, TracingContext tracingContext) throws AzureBlobFileSystemException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(xMsVersion); | ||
|
||
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION)); | ||
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); | ||
|
@@ -553,7 +553,7 @@ public AbfsRestOperation releaseLease(final String path, | |
|
||
public AbfsRestOperation breakLease(final String path, | ||
TracingContext tracingContext) throws AzureBlobFileSystemException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(xMsVersion); | ||
|
||
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, BREAK_LEASE_ACTION)); | ||
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_BREAK_PERIOD, DEFAULT_LEASE_BREAK_PERIOD)); | ||
|
@@ -601,7 +601,7 @@ public AbfsClientRenameResult renamePath( | |
boolean isMetadataIncompleteState, | ||
boolean isNamespaceEnabled) | ||
throws IOException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(xMsVersion); | ||
|
||
final boolean hasEtag = !isEmpty(sourceEtag); | ||
|
||
|
@@ -797,7 +797,7 @@ public AbfsRestOperation append(final String path, final byte[] buffer, | |
AppendRequestParameters reqParams, final String cachedSasToken, | ||
ContextEncryptionAdapter contextEncryptionAdapter, TracingContext tracingContext) | ||
throws AzureBlobFileSystemException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(xMsVersion); | ||
addEncryptionKeyRequestHeaders(path, requestHeaders, false, | ||
contextEncryptionAdapter, tracingContext); | ||
if (reqParams.isExpectHeaderEnabled()) { | ||
|
@@ -929,7 +929,7 @@ public AbfsRestOperation flush(final String path, final long position, | |
final String cachedSasToken, final String leaseId, | ||
ContextEncryptionAdapter contextEncryptionAdapter, TracingContext tracingContext) | ||
throws AzureBlobFileSystemException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(xMsVersion); | ||
addEncryptionKeyRequestHeaders(path, requestHeaders, false, | ||
contextEncryptionAdapter, tracingContext); | ||
// JDK7 does not support PATCH, so to workaround the issue we will use | ||
|
@@ -962,7 +962,7 @@ public AbfsRestOperation flush(final String path, final long position, | |
public AbfsRestOperation setPathProperties(final String path, final String properties, | ||
final TracingContext tracingContext, final ContextEncryptionAdapter contextEncryptionAdapter) | ||
throws AzureBlobFileSystemException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(xMsVersion); | ||
addEncryptionKeyRequestHeaders(path, requestHeaders, false, | ||
contextEncryptionAdapter, tracingContext); | ||
// JDK7 does not support PATCH, so to workaround the issue we will use | ||
|
@@ -990,7 +990,7 @@ public AbfsRestOperation getPathStatus(final String path, | |
final boolean includeProperties, final TracingContext tracingContext, | ||
final ContextEncryptionAdapter contextEncryptionAdapter) | ||
throws AzureBlobFileSystemException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(xMsVersion); | ||
|
||
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); | ||
String operation = SASTokenProvider.GET_PROPERTIES_OPERATION; | ||
|
@@ -1027,7 +1027,7 @@ public AbfsRestOperation read(final String path, | |
String cachedSasToken, | ||
ContextEncryptionAdapter contextEncryptionAdapter, | ||
TracingContext tracingContext) throws AzureBlobFileSystemException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(xMsVersion); | ||
addEncryptionKeyRequestHeaders(path, requestHeaders, false, | ||
contextEncryptionAdapter, tracingContext); | ||
requestHeaders.add(new AbfsHttpHeader(RANGE, | ||
|
@@ -1053,12 +1053,22 @@ public AbfsRestOperation read(final String path, | |
return op; | ||
} | ||
|
||
public AbfsRestOperation deletePath(final String path, final boolean recursive, final String continuation, | ||
public AbfsRestOperation deletePath(final String path, final boolean recursive, | ||
final String continuation, | ||
TracingContext tracingContext) | ||
throws AzureBlobFileSystemException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
|
||
final List<AbfsHttpHeader> requestHeaders | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a comment explaining what is happening, and move the = sign to this line, leaving There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a comment |
||
= (isPaginatedDelete(tracingContext, recursive) | ||
&& xMsVersion.compareTo(ApiVersion.AUG_03_2023) < 0) | ||
? createDefaultHeaders(ApiVersion.AUG_03_2023) | ||
: createDefaultHeaders(xMsVersion); | ||
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); | ||
|
||
if (isPaginatedDelete(tracingContext, recursive)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if this is a paginated delete but the L1126 api version test doesn't hold? Is that ok? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is a paginated delete, then API version change condition will only fail if current version is greater than AUG_03_2023. In that case we can go ahead with Current API Version only as Azure APIs are backward compatible, |
||
// Add paginated query parameter | ||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_PAGINATED, TRUE); | ||
} | ||
|
||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive)); | ||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); | ||
String operation = recursive ? SASTokenProvider.DELETE_RECURSIVE_OPERATION : SASTokenProvider.DELETE_OPERATION; | ||
|
@@ -1130,7 +1140,7 @@ public AbfsRestOperation deleteIdempotencyCheckOp(final AbfsRestOperation op) { | |
public AbfsRestOperation setOwner(final String path, final String owner, final String group, | ||
TracingContext tracingContext) | ||
throws AzureBlobFileSystemException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(xMsVersion); | ||
// JDK7 does not support PATCH, so to workaround the issue we will use | ||
// PUT and specify the real method in the X-Http-Method-Override header. | ||
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, | ||
|
@@ -1160,7 +1170,7 @@ public AbfsRestOperation setOwner(final String path, final String owner, final S | |
public AbfsRestOperation setPermission(final String path, final String permission, | ||
TracingContext tracingContext) | ||
throws AzureBlobFileSystemException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(xMsVersion); | ||
// JDK7 does not support PATCH, so to workaround the issue we will use | ||
// PUT and specify the real method in the X-Http-Method-Override header. | ||
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, | ||
|
@@ -1190,7 +1200,7 @@ public AbfsRestOperation setAcl(final String path, final String aclSpecString, | |
public AbfsRestOperation setAcl(final String path, final String aclSpecString, final String eTag, | ||
TracingContext tracingContext) | ||
throws AzureBlobFileSystemException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(xMsVersion); | ||
// JDK7 does not support PATCH, so to workaround the issue we will use | ||
// PUT and specify the real method in the X-Http-Method-Override header. | ||
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, | ||
|
@@ -1223,7 +1233,7 @@ public AbfsRestOperation getAclStatus(final String path, TracingContext tracingC | |
|
||
public AbfsRestOperation getAclStatus(final String path, final boolean useUPN, | ||
TracingContext tracingContext) throws AzureBlobFileSystemException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(xMsVersion); | ||
|
||
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); | ||
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_ACCESS_CONTROL); | ||
|
@@ -1261,7 +1271,7 @@ public AbfsRestOperation checkAccess(String path, String rwx, TracingContext tra | |
AbfsRestOperationType.CheckAccess, | ||
AbfsHttpConstants.HTTP_METHOD_HEAD, | ||
url, | ||
createDefaultHeaders()); | ||
createDefaultHeaders(xMsVersion)); | ||
op.execute(tracingContext); | ||
return op; | ||
} | ||
|
@@ -1401,6 +1411,16 @@ private synchronized Boolean getIsNamespaceEnabled(TracingContext tracingContext | |
return isNamespaceEnabled; | ||
} | ||
|
||
protected Boolean getIsPaginatedDeleteEnabled() { | ||
return abfsConfiguration.isPaginatedDeleteEnabled(); | ||
} | ||
|
||
private Boolean isPaginatedDelete(TracingContext tracingContext, | ||
boolean isRecursiveDelete) throws AzureBlobFileSystemException { | ||
return getIsPaginatedDeleteEnabled() | ||
&& getIsNamespaceEnabled(tracingContext) && isRecursiveDelete; | ||
} | ||
|
||
public AuthType getAuthType() { | ||
return authType; | ||
} | ||
|
@@ -1501,7 +1521,7 @@ protected AbfsCounters getAbfsCounters() { | |
return abfsCounters; | ||
} | ||
|
||
public String getxMsVersion() { | ||
public ApiVersion getxMsVersion() { | ||
return xMsVersion; | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,8 @@ | |
|
||
package org.apache.hadoop.fs.azurebfs.services; | ||
|
||
import java.util.List; | ||
|
||
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; | ||
|
||
public final class AbfsClientUtils { | ||
|
@@ -31,4 +33,13 @@ public static void setIsNamespaceEnabled(final AbfsClient abfsClient, final Bool | |
public static void setEncryptionContextProvider(final AbfsClient abfsClient, final EncryptionContextProvider provider) { | ||
abfsClient.setEncryptionContextProvider(provider); | ||
} | ||
|
||
public static String getHeaderValue(List<AbfsHttpHeader> reqHeaders, String headerName) { | ||
for (AbfsHttpHeader header : reqHeaders) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you could probably do something involving java 8 streaming/filtering here if you wanted to... |
||
if (header.getName().equals(headerName)) { | ||
return header.getValue(); | ||
} | ||
} | ||
return ""; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason to cut this? I presume it means no tests are using it...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this code was not used any where so removed it