Skip to content

Commit

Permalink
HADOOP-19044. S3A: AWS SDK V2 - Update region logic (#6479)
Browse files Browse the repository at this point in the history
Improves region handling in the S3A connector, including enabling cross-region support
when that is considered necessary.

Consult the documentation in connecting.md/connecting.html for the current
resolution process.

Contributed by Viraj Jasani
  • Loading branch information
virajjasani authored Feb 2, 2024
1 parent 7504b85 commit d278b34
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,10 @@ protected ClientOverrideConfiguration.Builder createClientOverrideConfiguration(
*/
private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> void configureEndpointAndRegion(
BuilderT builder, S3ClientCreationParameters parameters, Configuration conf) {
URI endpoint = getS3Endpoint(parameters.getEndpoint(), conf);
final String endpointStr = parameters.getEndpoint();
final URI endpoint = getS3Endpoint(endpointStr, conf);

String configuredRegion = parameters.getRegion();
final String configuredRegion = parameters.getRegion();
Region region = null;
String origin = "";

Expand All @@ -291,15 +292,33 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> void
if (endpoint != null) {
checkArgument(!fipsEnabled,
"%s : %s", ERROR_ENDPOINT_WITH_FIPS, endpoint);
builder.endpointOverride(endpoint);
// No region was configured, try to determine it from the endpoint.
boolean endpointEndsWithCentral =
endpointStr.endsWith(CENTRAL_ENDPOINT);

// No region was configured,
// determine the region from the endpoint.
if (region == null) {
region = getS3RegionFromEndpoint(parameters.getEndpoint());
region = getS3RegionFromEndpoint(endpointStr,
endpointEndsWithCentral);
if (region != null) {
origin = "endpoint";
}
}
LOG.debug("Setting endpoint to {}", endpoint);

// No need to override endpoint with "s3.amazonaws.com".
// Let the client take care of endpoint resolution. Overriding
// the endpoint with "s3.amazonaws.com" causes 400 Bad Request
// errors for non-existent buckets and objects.
// ref: https://github.com/aws/aws-sdk-java-v2/issues/4846
if (!endpointEndsWithCentral) {
builder.endpointOverride(endpoint);
LOG.debug("Setting endpoint to {}", endpoint);
} else {
builder.crossRegionAccessEnabled(true);
origin = "central endpoint with cross region access";
LOG.debug("Enabling cross region access for endpoint {}",
endpointStr);
}
}

if (region != null) {
Expand Down Expand Up @@ -354,20 +373,32 @@ private static URI getS3Endpoint(String endpoint, final Configuration conf) {

/**
* Parses the endpoint to get the region.
* If endpoint is the central one, use US_EAST_1.
* If endpoint is the central one, use US_EAST_2.
*
* @param endpoint the configure endpoint.
* @param endpointEndsWithCentral true if the endpoint is configured as central.
* @return the S3 region, null if unable to resolve from endpoint.
*/
private static Region getS3RegionFromEndpoint(String endpoint) {
private static Region getS3RegionFromEndpoint(final String endpoint,
final boolean endpointEndsWithCentral) {

if(!endpoint.endsWith(CENTRAL_ENDPOINT)) {
if (!endpointEndsWithCentral) {
LOG.debug("Endpoint {} is not the default; parsing", endpoint);
return AwsHostNameUtils.parseSigningRegion(endpoint, S3_SERVICE_NAME).orElse(null);
}

// endpoint is for US_EAST_1;
return Region.US_EAST_1;
// Select default region here to enable cross-region access.
// If both "fs.s3a.endpoint" and "fs.s3a.endpoint.region" are empty,
// Spark sets "fs.s3a.endpoint" to "s3.amazonaws.com".
// This applies to Spark versions with the changes of SPARK-35878.
// ref:
// https://github.com/apache/spark/blob/v3.5.0/core/
// src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L528
// If we do not allow cross region access, Spark would not be able to
// access any bucket that is not present in the given region.
// Hence, we should use default region us-east-2 to allow cross-region
// access.
return Region.of(AWS_S3_DEFAULT_REGION);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ The table below lists the configurations S3A was using and what they now map to.

Previously, if no endpoint and region was configured, fall back to using us-east-1. Set
withForceGlobalBucketAccessEnabled(true) which will allow access to buckets not in this region too.
Since the SDK V2 no longer supports cross region access, we need to set the region and endpoint of
the bucket. The behaviour has now been changed to:
Since the SDK V2 no longer supports cross region access, we need to set the region and
endpoint of the bucket. The behaviour has now been changed to:

* If no endpoint is specified, use s3.amazonaws.com.
* When setting the endpoint, also set the protocol (HTTP or HTTPS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,42 @@ With the move to the AWS V2 SDK, there is more emphasis on the region, set by th

Normally, declaring the region in `fs.s3a.endpoint.region` should be sufficient to set up the network connection to correctly connect to an AWS-hosted S3 store.

### <a name="s3_endpoint_region_details"></a> S3 endpoint and region settings in detail

* Configs `fs.s3a.endpoint` and `fs.s3a.endpoint.region` are used to set values
for S3 endpoint and region respectively.
* If `fs.s3a.endpoint.region` is configured with valid AWS region value, S3A will
configure the S3 client to use this value. If this is set to a region that does
not match your bucket, you will receive a 301 redirect response.
* If `fs.s3a.endpoint.region` is not set and `fs.s3a.endpoint` is set with valid
endpoint value, S3A will attempt to parse the region from the endpoint and
configure S3 client to use the region value.
* If both `fs.s3a.endpoint` and `fs.s3a.endpoint.region` are not set, S3A will
use `us-east-2` as default region and enable cross region access. In this case,
S3A does not attempt to override the endpoint while configuring the S3 client.
* If `fs.s3a.endpoint` is not set and `fs.s3a.endpoint.region` is set to an empty
string, S3A will configure S3 client without any region or endpoint override.
This will allow fallback to S3 SDK region resolution chain. More details
[here](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
* If `fs.s3a.endpoint` is set to central endpoint `s3.amazonaws.com` and
`fs.s3a.endpoint.region` is not set, S3A will use `us-east-2` as default region
and enable cross region access. In this case, S3A does not attempt to override
the endpoint while configuring the S3 client.
* If `fs.s3a.endpoint` is set to central endpoint `s3.amazonaws.com` and
`fs.s3a.endpoint.region` is also set to some region, S3A will use that region
value and enable cross region access. In this case, S3A does not attempt to
override the endpoint while configuring the S3 client.

When the cross region access is enabled while configuring the S3 client, even if the
region set is incorrect, S3 SDK determines the region. This is done by making the
request, and if the SDK receives 301 redirect response, it determines the region at
the cost of a HEAD request, and caches it.

Please note that some endpoint and region settings that require cross region access
are complex and improving over time. Hence, they may be considered unstable.

If you are working with third party stores, please check [third party stores in detail](third_party_stores.html).

### <a name="timeouts"></a> Network timeouts

See [Timeouts](performance.html#timeouts).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ If you do any of these: change your credentials immediately!

## Connecting to Amazon S3 or a third-party store

See [Connecting to an Amazon S3 Bucket through the S3A Connector](connecting.md).
See [Connecting to an Amazon S3 Bucket through the S3A Connector](connecting.html).

Also, please check [S3 endpoint and region settings in detail](connecting.html#s3_endpoint_region_details).

## <a name="authenticating"></a> Authenticating with S3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,20 @@
import software.amazon.awssdk.services.s3.model.HeadBucketResponse;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
import org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils;

import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS;
import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
import static org.apache.hadoop.fs.s3a.Constants.CENTRAL_ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS;
import static org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.ERROR_ENDPOINT_WITH_FIPS;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.DEFAULT_REQUESTER_PAYS_BUCKET_NAME;
import static org.apache.hadoop.io.IOUtils.closeStream;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

Expand Down Expand Up @@ -146,11 +153,28 @@ public void testCentralEndpoint() throws Throwable {
describe("Create a client with the central endpoint");
Configuration conf = getConfiguration();

S3Client client = createS3Client(conf, CENTRAL_ENDPOINT, null, US_EAST_1, false);
S3Client client = createS3Client(conf, CENTRAL_ENDPOINT, null, US_EAST_2, false);

expectInterceptorException(client);
}

@Test
public void testCentralEndpointWithRegion() throws Throwable {
describe("Create a client with the central endpoint but also specify region");
Configuration conf = getConfiguration();

S3Client client = createS3Client(conf, CENTRAL_ENDPOINT, US_WEST_2,
US_WEST_2, false);

expectInterceptorException(client);

client = createS3Client(conf, CENTRAL_ENDPOINT, US_EAST_1,
US_EAST_1, false);

expectInterceptorException(client);

}

@Test
public void testWithRegionConfig() throws Throwable {
describe("Create a client with a configured region");
Expand Down Expand Up @@ -257,6 +281,141 @@ public void testWithVPCE() throws Throwable {
expectInterceptorException(client);
}

@Test
public void testCentralEndpointAndDifferentRegionThanBucket() throws Throwable {
describe("Access public bucket using central endpoint and region "
+ "different than that of the public bucket");
final Configuration conf = getConfiguration();
final Configuration newConf = new Configuration(conf);

removeBaseAndBucketOverrides(
newConf,
ENDPOINT,
AWS_REGION,
ALLOW_REQUESTER_PAYS,
KEY_REQUESTER_PAYS_FILE);

removeBaseAndBucketOverrides(
DEFAULT_REQUESTER_PAYS_BUCKET_NAME,
newConf,
ENDPOINT,
AWS_REGION,
ALLOW_REQUESTER_PAYS,
KEY_REQUESTER_PAYS_FILE);

newConf.set(ENDPOINT, CENTRAL_ENDPOINT);
newConf.set(AWS_REGION, EU_WEST_1);
newConf.setBoolean(ALLOW_REQUESTER_PAYS, true);

Path filePath = new Path(PublicDatasetTestUtils
.getRequesterPaysObject(newConf));
newFS = (S3AFileSystem) filePath.getFileSystem(newConf);

Assertions
.assertThat(newFS.exists(filePath))
.describedAs("Existence of path: " + filePath)
.isTrue();
}

@Test
public void testCentralEndpointAndSameRegionAsBucket() throws Throwable {
describe("Access public bucket using central endpoint and region "
+ "same as that of the public bucket");
final Configuration conf = getConfiguration();
final Configuration newConf = new Configuration(conf);

removeBaseAndBucketOverrides(
newConf,
ENDPOINT,
AWS_REGION,
ALLOW_REQUESTER_PAYS,
KEY_REQUESTER_PAYS_FILE);

removeBaseAndBucketOverrides(
DEFAULT_REQUESTER_PAYS_BUCKET_NAME,
newConf,
ENDPOINT,
AWS_REGION,
ALLOW_REQUESTER_PAYS,
KEY_REQUESTER_PAYS_FILE);

newConf.set(ENDPOINT, CENTRAL_ENDPOINT);
newConf.set(AWS_REGION, US_WEST_2);
newConf.setBoolean(ALLOW_REQUESTER_PAYS, true);

Path filePath = new Path(PublicDatasetTestUtils
.getRequesterPaysObject(newConf));
newFS = (S3AFileSystem) filePath.getFileSystem(newConf);

Assertions
.assertThat(newFS.exists(filePath))
.describedAs("Existence of path: " + filePath)
.isTrue();
}

@Test
public void testCentralEndpointAndNullRegionWithCRUD() throws Throwable {
describe("Access the test bucket using central endpoint and"
+ " null region, perform file system CRUD operations");
final Configuration conf = getConfiguration();

final Configuration newConf = new Configuration(conf);

removeBaseAndBucketOverrides(
newConf,
ENDPOINT,
AWS_REGION);

newConf.set(ENDPOINT, CENTRAL_ENDPOINT);

newFS = new S3AFileSystem();
newFS.initialize(getFileSystem().getUri(), newConf);

assertOpsUsingNewFs();
}

private void assertOpsUsingNewFs() throws IOException {
final String file = getMethodName();
final Path basePath = methodPath();
final Path srcDir = new Path(basePath, "srcdir");
newFS.mkdirs(srcDir);
Path srcFilePath = new Path(srcDir, file);

try (FSDataOutputStream out = newFS.create(srcFilePath)) {
out.write(new byte[] {1, 2, 3});
}

Assertions
.assertThat(newFS.exists(srcFilePath))
.describedAs("Existence of file: " + srcFilePath)
.isTrue();
Assertions
.assertThat(getFileSystem().exists(srcFilePath))
.describedAs("Existence of file: " + srcFilePath)
.isTrue();

byte[] buffer = new byte[3];

try (FSDataInputStream in = newFS.open(srcFilePath)) {
in.readFully(buffer);
Assertions
.assertThat(buffer)
.describedAs("Contents read from " + srcFilePath)
.containsExactly(1, 2, 3);
}

newFS.delete(srcDir, true);

Assertions
.assertThat(newFS.exists(srcFilePath))
.describedAs("Existence of file: " + srcFilePath + " using new FS")
.isFalse();
Assertions
.assertThat(getFileSystem().exists(srcFilePath))
.describedAs("Existence of file: " + srcFilePath + " using original FS")
.isFalse();
}

private final class RegionInterceptor implements ExecutionInterceptor {
private final String endpoint;
private final String region;
Expand All @@ -272,7 +431,7 @@ private final class RegionInterceptor implements ExecutionInterceptor {
public void beforeExecution(Context.BeforeExecution context,
ExecutionAttributes executionAttributes) {

if (endpoint != null) {
if (endpoint != null && !endpoint.endsWith(CENTRAL_ENDPOINT)) {
Assertions.assertThat(
executionAttributes.getAttribute(AwsExecutionAttribute.ENDPOINT_OVERRIDDEN))
.describedAs("Endpoint not overridden").isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ private PublicDatasetTestUtils() {}
private static final String DEFAULT_REQUESTER_PAYS_FILE
= "s3a://usgs-landsat/collection02/catalog.json";

/**
* Default bucket name for the requester pays bucket.
* Value = {@value}.
*/
public static final String DEFAULT_REQUESTER_PAYS_BUCKET_NAME =
"usgs-landsat";

/**
* Default bucket for an S3A file system with many objects: {@value}.
*
Expand Down

0 comments on commit d278b34

Please sign in to comment.