-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-19139.No GetPathStatus for opening AbfsInputStream #6699
base: trunk
Are you sure you want to change the base?
HADOOP-19139.No GetPathStatus for opening AbfsInputStream #6699
Conversation
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
…prefetch trigger on the first read call
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
@steveloughran , @mukund-thakur , @mehakmeet , requesting your kind review please. Thanks! |
Hi @steveloughran , @mukund-thakur , @mehakmeet , requesting you to kindly review please. Thanks! |
🎊 +1 overall
This message was automatically generated. |
Was going through this today. AbfsInputStream is already a complex piece of code and these optimizations are making the code even more complex. can you please explain the benefits of this? From what I understand we are just trying to remove the HEAD call while opening the InputStream. ? Have you done any performance benchmarks after enabling the flags? |
💔 -1 overall
This message was automatically generated. |
@saxenapranav can you wait until there's a version of #6789 which adds something to hadoop common to build an enumset from a comma separated list of options? this will allow for easy extension -just add a new enum and a probe, and be consistent. I'll export it in a static method in a new class in org.apache.hadoop.util and call from Configuration, so it'll be easy for you to pick up too. Ideally we should try for common Enum names too: I'll let you start there and copy them in my work |
Hi @steveloughran , thanks for the suggestion! If I understand correctly, what we want to do is that we have a common config (from flag enum), that can tell filesystem about the head optimization. Sounds really good. This PR is really important as it reduces a redundant network call in AbfsInputStream's lifecycle. The referred PR I believe is in draft mode, requesting you to kindly advise if we should move ahead with this PR and then adapt the flags as a later PR, or if we wait for the referred PR. Thanks. |
That pr is no longer a draft, I've wired it up enough I'm happy with the design. why don't you review it and consider how it would be wired up -and if there are any tests I should add. I've put the real work in |
🎊 +1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @saxenapranav for the optimizations. Added a few comments, pls check it
...ls/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
Show resolved
Hide resolved
...op-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
Show resolved
Hide resolved
LOG.debug("EncryptionContext missing in GetPathStatus response"); | ||
throw new PathIOException(path.toString(), | ||
"EncryptionContext not present in GetPathStatus response headers"); | ||
if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, why do we need EncryptionType.ENCRYPTION_CONTEXT in the condition?
Should the condition be like below
if(!abfsConfiguration.isInputStreamLazyOptimizationEnabled()){
final AbfsHttpOperation op = client.getPathStatus(relativePath, false,
tracingContext, null).getResult();
resourceType = op.getResponseHeader(
HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
contentLength = Long.parseLong(
op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Actually, in case the encryptionType is ENCRYPTION_CONTEXT, read API needs to have following request headers:
- x-ms-encryption-key
- x-ms-encryption-key-sha256
To create these headers, encryptionContext is required which is given by the server on getPathStatus.
Hence, in case of ENCRYPTION_CONTEXT, we need to get the getPathStatus.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Presently, say abfsConfiguration.isInputStreamLazyOptimization is disabled then it will enter into the block even if its not ENCRYPTION_CONTEXT ?
Appreciates if you could clarify - on what condition the below code should be skipped(can't be executed) ?
final AbfsHttpOperation op = client.getPathStatus(relativePath, false,
tracingContext, null).getResult();
resourceType = op.getResponseHeader(
HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
contentLength = Long.parseLong(
op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I completely understand that logic has become bit messy and not easy to read. Have refactored the logic. Hope the new logic is more readable and understandable:
if (fileStatus instanceof VersionedFileStatus) {
VersionedFileStatus versionedFileStatus
= (VersionedFileStatus) fileStatus;
path = path.makeQualified(this.uri, path);
Preconditions.checkArgument(fileStatus.getPath().equals(path),
"Filestatus path [%s] does not match with given path [%s]",
fileStatus.getPath(), path);
resourceType = fileStatus.isFile() ? FILE : DIRECTORY;
contentLength = fileStatus.getLen();
eTag = versionedFileStatus.getVersion();
encryptionContext = versionedFileStatus.getEncryptionContext();
}
if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
if (encryptionContext == null) {
PathInformation pathInformation = getPathInformation(relativePath,
tracingContext);
resourceType = pathInformation.getResourceType();
contentLength = Long.parseLong(pathInformation.getContentLength());
eTag = pathInformation.getETag();
encryptionContext = pathInformation.getEncryptionContext();
if (encryptionContext == null) {
LOG.debug("EncryptionContext missing in GetPathStatus response");
throw new PathIOException(path.toString(),
"EncryptionContext not present in GetPathStatus response headers");
}
}
contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
client.getEncryptionContextProvider(), getRelativePath(path),
encryptionContext.getBytes(StandardCharsets.UTF_8));
} else if (fileStatus == null
&& !abfsConfiguration.isInputStreamLazyOptimizationEnabled()) {
PathInformation pathInformation = getPathInformation(relativePath,
tracingContext);
resourceType = pathInformation.getResourceType();
contentLength = Long.parseLong(pathInformation.getContentLength());
eTag = pathInformation.getETag();
}
if (parseIsDirectory(resourceType)) {
throw new AbfsRestOperationException(
AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
"openFileForRead must be used with files and not directories",
null);
}
} | ||
} | ||
|
||
if (parseIsDirectory(resourceType)) { | ||
if ((fileStatus != null || !abfsConfiguration.isInputStreamLazyOptimizationEnabled()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its good to keep the conditions simple for the code maintenance. Have you added filestatus != null.
for the defensive coding?
If filestatus != null
check is really required, then please add java comments about the case where it becomes null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, its not required. Have removed it. Thanks!
@@ -1148,7 +1149,9 @@ public AbfsRestOperation read(final String path, | |||
AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE, | |||
String.format("bytes=%d-%d", position, position + bufferLength - 1)); | |||
requestHeaders.add(rangeHeader); | |||
requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); | |||
if (!StringUtils.isEmpty(eTag)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use StringUtils.isNotEmpty
for better readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. Taken.
return contentLength; | ||
} | ||
|
||
boolean getFileStatusInformationPresent() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Please make it to
private boolean
visibility. - How abt rename the method to =>
hasFileStatusInfo()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken.
@@ -467,7 +566,21 @@ private boolean validate(final byte[] b, final int off, final int len) | |||
return true; | |||
} | |||
|
|||
private int copyToUserBuffer(byte[] b, int off, int len){ | |||
private int copyToUserBuffer(byte[] b, int off, int len, | |||
final boolean isOptimizedReadWithoutContentLengthInformation){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its relative thinking, we can reduce the var name
isOptimizedReadWithoutContentLengthInformation
-> to -> isOptimizedReadWithoutContentLengthInfo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken.
...tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
Show resolved
Hide resolved
} | ||
throw new IOException(ex); | ||
} finally { | ||
if (!getFileStatusInformationPresent() && abfsHttpOperation != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you move condition !getFileStatusInformationPresent()
to inside initPropertiesFromReadResponseHeader
void initPropertiesFromReadResponseHeader(abfsHttpOperation) {
if(!getFileStatusInformationPresent()) {
return;
}
//...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would suggest if still keep the hasFileStatusInfo check outside, so the reader of code understands that this piece of code should only be called if fileStatus is not already there.
Open to put in the method. What you feel?
Thanks!
@@ -591,6 +720,31 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t | |||
return (int) bytesRead; | |||
} | |||
|
|||
private void initPropertiesFromReadResponseHeader(final AbfsHttpOperation op) throws IOException { | |||
if (DIRECTORY.equals( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its duplicated in two places. Probably you can create a method and use it
if (DIRECTORY.equals(
op.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE))) {
throw new FileNotFoundException(
"read must be used with files and not directories. Path: " + path);
}
contentLength = parseFromRange(
op.getResponseHeader(HttpHeaderConfigurations.CONTENT_RANGE));
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
if (eTag != null && contentLength >= 0) {
fileStatusInformationPresent = true;
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Have taken out method validateFileResourceTypeAndParseETag
that checks the resourceType and parses Etag. Can be used both places.
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
default config:HNS-OAuth[ERROR] testBackoffRetryMetrics(org.apache.hadoop.fs.azurebfs.services.TestAbfsRestOperation) Time elapsed: 3.733 s <<< ERROR! [ERROR] Tests run: 142, Failures: 0, Errors: 1, Skipped: 2 Time taken: 7 mins 24 secs. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
no config: default::::: AGGREGATED TEST RESULT :::: ============================================================
|
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
jira: https://issues.apache.org/jira/browse/HADOOP-19139
Read API gives contentLen and etag of the path. This information would be used in future calls on that inputStream. Prior information of eTag is of not much importance.
Added two configs:
1.fs.azure.input.stream.lazy.open.optimization.enabled : if this is true, head call would be prevented for opening AbfsInputStream.
- default: false, as in trunk right now, head call always happen before opening AbfsInputStream.
2. fs.azure.prefetch.on.first.read.enabled: if this is false, the first read call on AbfsInputStream would not execute prefetch. But, after first read, subsequent calls would be able to call prefetches.
TESTS done on :
2.fs.azure.input.stream.lazy.open.optimization.enabled = true; fs.azure.prefetch.on.first.read.enabled = false
:::: AGGREGATED TEST RESULT ::::
HNS-OAuth
[INFO] Results:
[INFO]
[WARNING] Tests run: 137, Failures: 0, Errors: 0, Skipped: 2
[INFO] Results:
[INFO]
[ERROR] Errors:
[ERROR] ITestAzureBlobFileSystemLease.testAcquireRetry:329 » TestTimedOut test timed o...
[INFO]
[ERROR] Tests run: 578, Failures: 0, Errors: 1, Skipped: 82
[INFO] Results:
[INFO]
[ERROR] Errors:
[ERROR] ITestAbfsTerasort.test_120_terasort:262->executeStage:206 » IO The ownership o...
[INFO]
[ERROR] Tests run: 380, Failures: 0, Errors: 1, Skipped: 55
HNS-SharedKey
[INFO] Results:
[INFO]
[WARNING] Tests run: 137, Failures: 0, Errors: 0, Skipped: 3
[INFO] Results:
[INFO]
[ERROR] Failures:
[ERROR] ITestAzureBlobFileSystemRandomRead.testValidateSeekBounds:278->Assert.assertTrue:42->Assert.fail:89 There should not be any network I/O (elapsedTimeMs=72).
[ERROR] Errors:
[ERROR] ITestAzureBlobFileSystemLease.testAcquireRetry:329 » TestTimedOut test timed o...
[INFO]
[ERROR] Tests run: 585, Failures: 1, Errors: 1, Skipped: 37
[INFO] Results:
[INFO]
[WARNING] Tests run: 380, Failures: 0, Errors: 0, Skipped: 41
NonHNS-SharedKey
[INFO] Results:
[INFO]
[WARNING] Tests run: 137, Failures: 0, Errors: 0, Skipped: 9
[INFO] Results:
[INFO]
[ERROR] Errors:
[ERROR] ITestAzureBlobFileSystemLease.testAcquireRetry:344->lambda$testAcquireRetry$6:345 » TestTimedOut
[ERROR] ITestAzureBlobFileSystemLease.testTwoWritersCreateAppendWithInfiniteLeaseEnabled:186->twoWriters:154 » TestTimedOut
[INFO]
[ERROR] Tests run: 569, Failures: 0, Errors: 2, Skipped: 267
[INFO] Results:
[INFO]
[WARNING] Tests run: 380, Failures: 0, Errors: 0, Skipped: 44
AppendBlob-HNS-OAuth
[INFO] Results:
[INFO]
[WARNING] Tests run: 137, Failures: 0, Errors: 0, Skipped: 2
[INFO] Results:
[INFO]
[ERROR] Failures:
[ERROR] ITestAzureBlobFileSystemRandomRead.testSkipBounds:222->Assert.assertTrue:42->Assert.fail:89 There should not be any network I/O (elapsedTimeMs=703).
[ERROR] Errors:
[ERROR] ITestAzureBlobFileSystemLease.testAcquireRetry:329 » TestTimedOut test timed o...
[INFO]
[ERROR] Tests run: 596, Failures: 1, Errors: 1, Skipped: 82
[INFO] Results:
[INFO]
[ERROR] Errors:
[ERROR] ITestAbfsTerasort.test_120_terasort:262->executeStage:206 » IO The ownership o...
[INFO]
[ERROR] Tests run: 380, Failures: 0, Errors: 1, Skipped: 55
Time taken: 44 mins 39 secs.