Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19139.No GetPathStatus for opening AbfsInputStream #6699

Open
wants to merge 76 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
e2a4e05
have a way where in the first read call set the pathStatus infomation…
saxenapranav Mar 28, 2024
8d4756b
successfulUsage as atomicInteger; readFully; readOptimization conditions
saxenapranav Mar 29, 2024
4aae0c8
done with src code
saxenapranav Mar 29, 2024
2d6f0cb
readFully optimization condition change
saxenapranav Apr 3, 2024
5939e60
successfulUsage
saxenapranav Apr 3, 2024
c8a6a00
read buffer and length set for optimized read in case of no gps info
saxenapranav Apr 3, 2024
4ad349c
read contentlen from content-range in read response
saxenapranav Apr 4, 2024
ee1f4c3
correct calc
saxenapranav Apr 4, 2024
cfc4f8e
getContentLength
saxenapranav Apr 4, 2024
cc09403
testExceptionInOptimization
saxenapranav Apr 5, 2024
6cb1773
config for head optimization in abfsinputstream, config for allowing …
saxenapranav Apr 5, 2024
bd920a5
Merge branch 'trunk' into saxenapranav/noGpsForRead
saxenapranav Apr 5, 2024
37bc691
fix in contract test if head optimization is switched on
saxenapranav Apr 8, 2024
7ecabf9
prefetch on first would also depened if Etag info is there; test smal…
saxenapranav Apr 8, 2024
2c9da9e
breaking readOptimized loop; getContentLength()
saxenapranav Apr 8, 2024
58a399c
importnat change in abfsinputStream; tesch changes wip
saxenapranav Apr 10, 2024
7d22917
fixed test in Footer testclass
saxenapranav Apr 10, 2024
52e19c0
small read test fixed
saxenapranav Apr 10, 2024
8c76674
readFooter test refactor
saxenapranav Apr 10, 2024
a06a2a1
fixed failing tests; footer optimization can start on fcursor ==0 as …
saxenapranav Apr 10, 2024
8f44c96
fix failing tests
saxenapranav Apr 11, 2024
b66eea6
AbfsRestOperationException now contains the ref to abfsHttpOperation …
saxenapranav Apr 11, 2024
45a0796
test fix wip, when readOptimizad fail with FileNotFound, dont do read…
saxenapranav Apr 11, 2024
b1fd443
isNonRetriableOptimizedReadException
saxenapranav Apr 11, 2024
38f5592
switch off DEFAULT_HEAD_OPTIMIZATION_INPUT_STREAM
saxenapranav Apr 12, 2024
074fb39
switching off DEFAULT_PREFETCH_READAHEAD_ON_FIRST_READ
saxenapranav Apr 12, 2024
99e397c
giving sync access to fileStatusInformationPresent. for un-sync metho…
saxenapranav Apr 12, 2024
fe1df56
spotbugs issue
saxenapranav Apr 12, 2024
0475b3e
Fixed readahead contract seek test for the prefetch config
saxenapranav Apr 12, 2024
a1c56c2
added test for small read with the head optimization changes
saxenapranav Apr 12, 2024
dc898c8
added test for readFooter if it can read invalid data.
saxenapranav Apr 12, 2024
9e7bb6c
fix the failing test
saxenapranav Apr 12, 2024
cb929f9
remove non-required assume
saxenapranav Apr 12, 2024
e7b121a
spotbugs; checkstyle
saxenapranav Apr 14, 2024
51373f0
default configs done to what is there on OSS trunk; correction of SAS…
saxenapranav Apr 14, 2024
9b998d4
added comments on the metric calculations for better code readablity
saxenapranav Apr 14, 2024
7d1a274
update in filesystem.md that fs.open might be lazy open and nonexiste…
saxenapranav Apr 14, 2024
cd9b0de
directory check and not found file check in inputStream
saxenapranav Apr 14, 2024
0c454d7
eol fix
saxenapranav Apr 14, 2024
5c3553b
Merge branch 'trunk' into saxenapranav/noGpsForRead
saxenapranav Apr 16, 2024
b224fa2
no atomicBoolean required
saxenapranav Apr 16, 2024
2fc5bbd
etag condition in header add; check if fileStatus provided which chec…
saxenapranav Apr 16, 2024
e7cfec5
remove same code method from the 3 contract class; has been braught i…
saxenapranav Apr 17, 2024
2f51126
try catch for fs.open mock in AbfsFileSystemContract
saxenapranav Apr 17, 2024
02b3421
review comments
saxenapranav Apr 17, 2024
4c08320
review comments
saxenapranav Apr 17, 2024
f62f868
review comments; exception catching only on FileNotFoundException
saxenapranav Apr 18, 2024
36ba7c0
review comments
saxenapranav Apr 18, 2024
7978c27
reduce git diff
saxenapranav Apr 18, 2024
8d38aed
added comments for better understanding
saxenapranav Apr 18, 2024
811693d
arraycopy conditioning for optimizedRead; added test for it
saxenapranav Apr 18, 2024
065919a
changes in durationtracker reverted
saxenapranav Apr 18, 2024
9e73700
compile issue
saxenapranav Apr 18, 2024
1b3ba1b
Revert "changes in durationtracker reverted"
saxenapranav Apr 18, 2024
f4fef33
made ITestAbfsDurationTrackers run sequential; assertion revert revert
saxenapranav Apr 18, 2024
caf6c56
Revert "made ITestAbfsDurationTrackers run sequential; assertion reve…
saxenapranav Apr 18, 2024
480a705
Revert "Revert "changes in durationtracker reverted""
saxenapranav Apr 18, 2024
e7cd9a3
greater than equal to comparison
saxenapranav Apr 18, 2024
4154485
checkstyle
saxenapranav Apr 18, 2024
caa0756
new configs set to the default as in trunk
saxenapranav Apr 19, 2024
5ddf14a
javadocs; comments refactor
saxenapranav Apr 19, 2024
0b53e1e
corrected getter method name in abfsConfiguration for new configs
saxenapranav Apr 19, 2024
bd86173
assertion change for readFullFile config; added comments
saxenapranav Apr 22, 2024
44ffeb3
checkstyles
saxenapranav Apr 22, 2024
01239aa
Merge branch 'trunk' into saxenapranav/noGpsForRead
saxenapranav Apr 23, 2024
ffefdb3
added comment for better understanding on why throwing FileNotFound i…
saxenapranav Apr 25, 2024
20f4f2b
Merge branch 'trunk' into saxenapranav/noGpsForRead
saxenapranav May 6, 2024
0a1b23c
Merge branch 'trunk' into saxenapranav/noGpsForRead
saxenapranav Jul 8, 2024
19c1d94
review refactor
saxenapranav Jul 15, 2024
429a518
spotbugs
saxenapranav Jul 16, 2024
d096f3d
abfs.md doc; refactored to simplify openFileForRead for HEAD call sce…
saxenapranav Jul 17, 2024
9231959
remove synchronized blocks and let native long contentLength as volatile
saxenapranav Jul 17, 2024
4547f53
code fix
saxenapranav Jul 17, 2024
4df6b02
code fix
saxenapranav Jul 17, 2024
c8299fc
fix, abfsHttpOp only on success case and not from exception
saxenapranav Jul 19, 2024
6de6b88
checkstyle
saxenapranav Jul 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,9 @@ Implementations without a compliant call SHOULD throw `UnsupportedOperationExcep

This is a critical precondition. Implementations of some FileSystems (e.g.
Object stores) could shortcut one round trip by postponing their HTTP GET
operation until the first `read()` on the returned `FSDataInputStream`.
operation until the first `read()` on the returned `FSDataInputStream`. This
lazy open may not surface file non-existence or access permission failures
until the first `read()` of the actual data.
However, much client code does depend on the existence check being performed
at the time of the `open()` operation. Implementations MUST check for the
presence of the file at the time of creation. This does not imply that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,14 @@ public class AbfsConfiguration{
FS_AZURE_ENABLE_PAGINATED_DELETE, DefaultValue = DEFAULT_ENABLE_PAGINATED_DELETE)
private boolean isPaginatedDeleteEnabled;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_INPUT_STREAM_LAZY_OPEN_OPTIMIZATION_ENABLED,
DefaultValue = DEFAULT_HEAD_OPTIMIZATION_INPUT_STREAM)
private boolean isInputStreamLazyOptimizationEnabled;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_PREFETCH_ON_FIRST_READ_ENABLED,
DefaultValue = DEFAULT_PREFETCH_READAHEAD_ON_FIRST_READ)
private boolean isPrefetchOnFirstReadEnabled;

private String clientProvidedEncryptionKey;
private String clientProvidedEncryptionKeySHA;

Expand Down Expand Up @@ -1338,6 +1346,14 @@ public boolean getIsChecksumValidationEnabled() {
return isChecksumValidationEnabled;
}

public boolean isInputStreamLazyOptimizationEnabled() {
return isInputStreamLazyOptimizationEnabled;
}

public boolean isPrefetchOnFirstReadEnabled() {
return isPrefetchOnFirstReadEnabled;
}

@VisibleForTesting
public void setIsChecksumValidationEnabled(boolean isChecksumValidationEnabled) {
this.isChecksumValidationEnabled = isChecksumValidationEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -856,63 +856,51 @@ public AbfsInputStream openFileForRead(Path path,
LOG.debug("openFileForRead filesystem: {} path: {}",
client.getFileSystem(), path);

FileStatus fileStatus = parameters.map(OpenFileParameters::getStatus)
final FileStatus fileStatus = parameters.map(OpenFileParameters::getStatus)
.orElse(null);
String relativePath = getRelativePath(path);
String resourceType, eTag;
long contentLength;
String resourceType = null, eTag = null;
rakeshadr marked this conversation as resolved.
Show resolved Hide resolved
long contentLength = -1;
ContextEncryptionAdapter contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance();
/*
* GetPathStatus API has to be called in case of:
* 1. fileStatus is null or not an object of VersionedFileStatus: as eTag
* would not be there in the fileStatus object.
* 2. fileStatus is an object of VersionedFileStatus and the object doesn't
* have encryptionContext field when client's encryptionType is
* ENCRYPTION_CONTEXT.
*/
if ((fileStatus instanceof VersionedFileStatus) && (
client.getEncryptionType() != EncryptionType.ENCRYPTION_CONTEXT
|| ((VersionedFileStatus) fileStatus).getEncryptionContext()
!= null)) {
String encryptionContext = null;
if (fileStatus instanceof VersionedFileStatus) {
VersionedFileStatus versionedFileStatus
= (VersionedFileStatus) fileStatus;
path = path.makeQualified(this.uri, path);
Preconditions.checkArgument(fileStatus.getPath().equals(path),
String.format(
"Filestatus path [%s] does not match with given path [%s]",
fileStatus.getPath(), path));
"Filestatus path [%s] does not match with given path [%s]",
fileStatus.getPath(), path);
resourceType = fileStatus.isFile() ? FILE : DIRECTORY;
contentLength = fileStatus.getLen();
eTag = ((VersionedFileStatus) fileStatus).getVersion();
final String encryptionContext
= ((VersionedFileStatus) fileStatus).getEncryptionContext();
if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
client.getEncryptionContextProvider(), getRelativePath(path),
encryptionContext.getBytes(StandardCharsets.UTF_8));
}
} else {
AbfsHttpOperation op = client.getPathStatus(relativePath, false,
tracingContext, null).getResult();
resourceType = op.getResponseHeader(
HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
contentLength = Long.parseLong(
op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
/*
* For file created with ENCRYPTION_CONTEXT, client shall receive
* encryptionContext from header field: X_MS_ENCRYPTION_CONTEXT.
*/
if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
final String fileEncryptionContext = op.getResponseHeader(
HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT);
if (fileEncryptionContext == null) {
eTag = versionedFileStatus.getVersion();
encryptionContext = versionedFileStatus.getEncryptionContext();
}

if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
if (encryptionContext == null) {
FileStatusInternal fileStatusInternal = getFileStatusInternal(relativePath,
tracingContext);
resourceType = fileStatusInternal.getResourceType();
contentLength = Long.parseLong(fileStatusInternal.getContentLength());
eTag = fileStatusInternal.getETag();
encryptionContext = fileStatusInternal.getEncryptionContext();

if (encryptionContext == null) {
LOG.debug("EncryptionContext missing in GetPathStatus response");
throw new PathIOException(path.toString(),
"EncryptionContext not present in GetPathStatus response headers");
}
contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
client.getEncryptionContextProvider(), getRelativePath(path),
fileEncryptionContext.getBytes(StandardCharsets.UTF_8));
}
contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
client.getEncryptionContextProvider(), getRelativePath(path),
encryptionContext.getBytes(StandardCharsets.UTF_8));
} else if (fileStatus == null
&& !abfsConfiguration.isInputStreamLazyOptimizationEnabled()) {
FileStatusInternal fileStatusInternal = getFileStatusInternal(relativePath,
tracingContext);
resourceType = fileStatusInternal.getResourceType();
contentLength = Long.parseLong(fileStatusInternal.getContentLength());
eTag = fileStatusInternal.getETag();
}

if (parseIsDirectory(resourceType)) {
Expand All @@ -934,6 +922,35 @@ contentLength, populateAbfsInputStreamContext(
}
}

/**
* Calls pathStatus API on the path and returns the FileStatusInternal which
* contains the ETag, ContentLength, ResourceType and EncryptionContext parsed
* from the API response.
*
* @param relativePath Path to get the status of.
* @param tracingContext TracingContext instance.
*
* @return FileStatusInternal instance containing the ETag, ContentLength,
* ResourceType and EncryptionContext of the path.
* @throws AzureBlobFileSystemException server error.
*/
private FileStatusInternal getFileStatusInternal(String relativePath,
TracingContext tracingContext) throws AzureBlobFileSystemException {
AbfsRestOperation op = client.getPathStatus(relativePath, false,
tracingContext, null);
String contentLength = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH);
String eTag = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.ETAG);
String resourceType = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
String encryptionContext = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT);

return new FileStatusInternal(eTag, contentLength, resourceType,
encryptionContext);
}

private AbfsInputStreamContext populateAbfsInputStreamContext(
Optional<Configuration> options, ContextEncryptionAdapter contextEncryptionAdapter) {
boolean bufferedPreadDisabled = options
Expand All @@ -958,6 +975,8 @@ AZURE_FOOTER_READ_BUFFER_SIZE, getAbfsConfiguration().getFooterReadBufferSize())
.withBufferedPreadDisabled(bufferedPreadDisabled)
.withEncryptionAdapter(contextEncryptionAdapter)
.withAbfsBackRef(fsBackRef)
.withPrefetchTriggerOnFirstRead(
abfsConfiguration.isPrefetchOnFirstReadEnabled())
.build();
}

Expand Down Expand Up @@ -2058,6 +2077,36 @@ public String toString() {
}
}

private static final class FileStatusInternal {
private String eTag;
private String contentLength;
private String resourceType;
private String encryptionContext;

private FileStatusInternal(String eTag, String contentLength, String resourceType, String encryptionContext) {
this.eTag = eTag;
this.contentLength = contentLength;
this.resourceType = resourceType;
this.encryptionContext = encryptionContext;
}

public String getETag() {
return eTag;
}

public String getContentLength() {
return contentLength;
}

public String getResourceType() {
return resourceType;
}

public String getEncryptionContext() {
return encryptionContext;
}
}

/**
* Permissions class contain provided permission and umask in octalNotation.
* If the object is created for namespace-disabled account, the permission and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public final class AbfsHttpConstants {
public static final String HTTP_HEADER_PREFIX = "x-ms-";
public static final String HASH = "#";
public static final String TRUE = "true";
public static final String FALSE = "false";

public static final String PLUS_ENCODE = "%20";
public static final String FORWARD_SLASH_ENCODE = "%2F";
Expand Down Expand Up @@ -169,6 +170,8 @@ public static ApiVersion getCurrentVersion() {
*/
public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100;

public static final Integer READ_PATH_REQUEST_NOT_SATISFIABLE = 416;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we name this as per format followed by other HTTP Status code constants:
HTTP_RANGE_NOT_SATISFIABLE

Copy link
Contributor Author

@saxenapranav saxenapranav Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

416 is not given constant in HttpUrlConnection class. Hence, maintaining a new constant in AbfsHttpConstants

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. But what I was proposing is to name this new variable as per the format followed by HttpUrlConnection class.

What are your thoughts??


/**
* List of configurations that are related to Customer-Provided-Keys.
* <ol>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,5 +321,21 @@ public static String accountProperty(String property, String account) {
* @see FileSystem#openFile(org.apache.hadoop.fs.Path)
*/
public static final String FS_AZURE_BUFFERED_PREAD_DISABLE = "fs.azure.buffered.pread.disable";

/**
* Enable lazy opening of an inputStream. Lazy opening would not call HEAD call
* to get file metadata before creating inputStream. ReadPath API of server
* would give the contentLength and eTag which would be used in subsequent calls
* for if-match headers.
*/
public static final String
FS_AZURE_INPUT_STREAM_LAZY_OPEN_OPTIMIZATION_ENABLED = "fs.azure.input.stream.lazy.open.optimization.enabled";

/**
* Enable prefetch on the first read to {@link org.apache.hadoop.fs.azurebfs.services.AbfsInputStream}.
* If disabled, first call would not trigger prefetch. Prefetch would be switched on
rakeshadr marked this conversation as resolved.
Show resolved Hide resolved
* after first read call.
*/
public static final String FS_AZURE_PREFETCH_ON_FIRST_READ_ENABLED = "fs.azure.prefetch.on.first.read.enabled";
private ConfigurationKeys() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,5 +167,9 @@ public final class FileSystemConfigurations {
public static final int HUNDRED = 100;
public static final long THOUSAND = 1000L;

public static final boolean DEFAULT_HEAD_OPTIMIZATION_INPUT_STREAM = false;

public static final boolean DEFAULT_PREFETCH_READAHEAD_ON_FIRST_READ = true;

private FileSystemConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public final class HttpHeaderConfigurations {
public static final String IF_MATCH = "If-Match";
public static final String IF_NONE_MATCH = "If-None-Match";
public static final String CONTENT_LENGTH = "Content-Length";
public static final String CONTENT_RANGE = "Content-Range";
rakeshadr marked this conversation as resolved.
Show resolved Hide resolved
public static final String CONTENT_ENCODING = "Content-Encoding";
public static final String CONTENT_LANGUAGE = "Content-Language";
public static final String CONTENT_MD5 = "Content-MD5";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class AbfsRestOperationException extends AzureBlobFileSystemException {
private final int statusCode;
private final AzureServiceErrorCode errorCode;
private final String errorMessage;
private final AbfsHttpOperation abfsHttpOperation;

public AbfsRestOperationException(
final int statusCode,
Expand All @@ -45,6 +46,7 @@ public AbfsRestOperationException(
this.statusCode = statusCode;
this.errorCode = AzureServiceErrorCode.getAzureServiceCode(this.statusCode, errorCode);
this.errorMessage = errorMessage;
this.abfsHttpOperation = null;
}

public AbfsRestOperationException(
Expand All @@ -55,6 +57,7 @@ public AbfsRestOperationException(
final AbfsHttpOperation abfsHttpOperation) {
super(formatMessage(abfsHttpOperation), innerException);

this.abfsHttpOperation = abfsHttpOperation;
this.statusCode = statusCode;
this.errorCode = AzureServiceErrorCode.getAzureServiceCode(this.statusCode, errorCode);
this.errorMessage = errorMessage;
Expand All @@ -66,6 +69,7 @@ public AbfsRestOperationException(final HttpException innerException) {
this.statusCode = innerException.getHttpErrorCode();
this.errorCode = AzureServiceErrorCode.UNKNOWN;
this.errorMessage = innerException.getMessage();
this.abfsHttpOperation = null;
}

public int getStatusCode() {
Expand All @@ -80,6 +84,10 @@ public String getErrorMessage() {
return this.errorMessage;
}

public AbfsHttpOperation getAbfsHttpOperation() {
return this.abfsHttpOperation;
}

private static String formatMessage(final AbfsHttpOperation abfsHttpOperation) {
// HEAD request response doesn't have StorageErrorCode, StorageErrorMessage.
if (abfsHttpOperation.getMethod().equals("HEAD")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
Expand Down Expand Up @@ -1148,7 +1149,9 @@ public AbfsRestOperation read(final String path,
AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE,
String.format("bytes=%d-%d", position, position + bufferLength - 1));
requestHeaders.add(rangeHeader);
requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
if (StringUtils.isNotEmpty(eTag)) {
requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
}

// Add request header to fetch MD5 Hash of data returned by server.
if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) {
Expand Down
Loading
Loading