diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 4408cf68a451e..744146ccf4f37 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -337,16 +337,33 @@ private Constants() {
public static final int DEFAULT_SOCKET_TIMEOUT = (int)DEFAULT_SOCKET_TIMEOUT_DURATION.toMillis();
/**
- * Time until a request is timed-out: {@value}.
- * If zero, there is no timeout.
+ * How long should the SDK retry/wait on a response from an S3 store: {@value}
+ * including the time needed to sign the request.
+ *
+ * This is time to response, so for a GET request it is "time to 200 response"
+ * not the time limit to download the requested data.
+ * This makes it different from {@link #REQUEST_TIMEOUT}, which is for total
+ * HTTP request.
+ *
+ * Default unit is milliseconds.
+ *
+ * There is a minimum duration set in {@link #MINIMUM_NETWORK_OPERATION_DURATION};
+ * it is impossible to set a delay less than this, even for testing.
+ * Why so? Too many deployments where the configuration assumed the timeout was in seconds
+ * and that "120" was a reasonable value rather than "too short to work reliably"
+ *
+ * Note for anyone writing tests which need to set a low value for this:
+ * to avoid the minimum duration overrides, call
+ * {@code AWSClientConfig.setMinimumOperationDuration()} and set a low value
+ * before creating the filesystem.
*/
public static final String REQUEST_TIMEOUT =
"fs.s3a.connection.request.timeout";
/**
- * Default duration of a request before it is timed out: Zero.
+ * Default duration of a request before it is timed out: 60s.
*/
- public static final Duration DEFAULT_REQUEST_TIMEOUT_DURATION = Duration.ZERO;
+ public static final Duration DEFAULT_REQUEST_TIMEOUT_DURATION = Duration.ofSeconds(60);
/**
* Default duration of a request before it is timed out: Zero.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/CustomHttpSigner.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/CustomHttpSigner.java
index ba1169a5e5987..528414b63e32e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/CustomHttpSigner.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/CustomHttpSigner.java
@@ -40,7 +40,7 @@
* fs.s3a.http.signer.class = org.apache.hadoop.fs.s3a.auth.CustomHttpSigner
*
*/
-public final class CustomHttpSigner implements HttpSigner {
+public class CustomHttpSigner implements HttpSigner {
private static final Logger LOG = LoggerFactory
.getLogger(CustomHttpSigner.class);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java
index f6da9d84e0a77..60729ac30866a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java
@@ -577,7 +577,7 @@ static ClientSettings createApiConnectionSettings(Configuration conf) {
/**
* Build the HTTP connection settings object from the configuration.
- * All settings are calculated, including the api call timeout.
+ * All settings are calculated.
* @param conf configuration to evaluate
* @return connection settings.
*/
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
index 8787fca431cc7..73bba9d62cbd8 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
@@ -23,6 +23,7 @@
import java.net.ConnectException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
+import java.time.Duration;
import org.assertj.core.api.Assertions;
import org.junit.Rule;
@@ -49,6 +50,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.auth.STSClientFactory;
+import org.apache.hadoop.fs.s3a.impl.AWSClientConfig;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.security.UserGroupInformation;
@@ -435,16 +437,22 @@ public void testCustomUserAgent() throws Exception {
@Test
public void testRequestTimeout() throws Exception {
conf = new Configuration();
- skipIfCrossRegionClient(conf);
- conf.set(REQUEST_TIMEOUT, "120");
- fs = S3ATestUtils.createTestFileSystem(conf);
- S3Client s3 = getS3Client("Request timeout (ms)");
- SdkClientConfiguration clientConfiguration = getField(s3, SdkClientConfiguration.class,
- "clientConfiguration");
- assertEquals("Configured " + REQUEST_TIMEOUT +
- " is different than what AWS sdk configuration uses internally",
- 120000,
- clientConfiguration.option(SdkClientOption.API_CALL_ATTEMPT_TIMEOUT).toMillis());
+ // remove the safety check on minimum durations.
+ AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
+ try {
+ Duration timeout = Duration.ofSeconds(120);
+ conf.set(REQUEST_TIMEOUT, timeout.getSeconds() + "s");
+ fs = S3ATestUtils.createTestFileSystem(conf);
+ S3Client s3 = getS3Client("Request timeout (ms)");
+ SdkClientConfiguration clientConfiguration = getField(s3, SdkClientConfiguration.class,
+ "clientConfiguration");
+ Assertions.assertThat(clientConfiguration.option(SdkClientOption.API_CALL_ATTEMPT_TIMEOUT))
+ .describedAs("Configured " + REQUEST_TIMEOUT +
+ " is different than what AWS sdk configuration uses internally")
+ .isEqualTo(timeout);
+ } finally {
+ AWSClientConfig.resetMinimumOperationDuration();
+ }
}
@Test
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index ed1fda316dfe5..e7ea920d8a0a0 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -550,6 +550,16 @@ public static void skipIfS3ExpressBucket(
!isS3ExpressTestBucket(configuration));
}
+ /**
+ * Skip a test if the test bucket is not an S3Express bucket.
+ * @param configuration configuration to probe
+ */
+ public static void skipIfNotS3ExpressBucket(
+ Configuration configuration) {
+ assume("Skipping test as bucket is not an S3Express bucket",
+ isS3ExpressTestBucket(configuration));
+ }
+
/**
* Is the test bucket an S3Express bucket?
* @param conf configuration
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestCreateSessionTimeout.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestCreateSessionTimeout.java
new file mode 100644
index 0000000000000..ebd771bddb3ff
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestCreateSessionTimeout.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.performance;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.SdkHttpRequest;
+import software.amazon.awssdk.http.auth.spi.signer.AsyncSignRequest;
+import software.amazon.awssdk.http.auth.spi.signer.AsyncSignedRequest;
+import software.amazon.awssdk.http.auth.spi.signer.HttpSigner;
+import software.amazon.awssdk.http.auth.spi.signer.SignRequest;
+import software.amazon.awssdk.http.auth.spi.signer.SignedRequest;
+import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.AWSApiCallTimeoutException;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.auth.CustomHttpSigner;
+import org.apache.hadoop.fs.s3a.impl.AWSClientConfig;
+import org.apache.hadoop.util.DurationInfo;
+
+import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_SIGNERS;
+import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_CLASS_NAME;
+import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED;
+import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
+import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
+import static org.apache.hadoop.fs.s3a.Constants.S3EXPRESS_CREATE_SESSION;
+import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_S3;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotS3ExpressBucket;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test timeout of S3 Client CreateSession call, which was originally
+ * hard coded to 10 seconds.
+ * Only executed against an S3Express store.
+ */
+public class ITestCreateSessionTimeout extends AbstractS3ACostTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestCreateSessionTimeout.class);
+
+ /**
+ * What is the duration for the operation after which the test is considered
+ * to have failed because timeouts didn't get passed down?
+ */
+ private static final long TIMEOUT_EXCEPTION_THRESHOLD = Duration.ofSeconds(5).toMillis();
+
+ /**
+ * How long to sleep in requests?
+ */
+ private static final AtomicLong SLEEP_DURATION = new AtomicLong(
+ Duration.ofSeconds(20).toMillis());
+
+ /**
+ * Flag set if the sleep was interrupted during signing.
+ */
+ private static final AtomicBoolean SLEEP_INTERRUPTED = new AtomicBoolean(false);
+
+ /**
+ * Create a configuration with a 10 millisecond timeout on API calls
+ * and a custom signer which sleeps much longer than that.
+ * @return the configuration.
+ */
+ @Override
+ public Configuration createConfiguration() {
+ final Configuration conf = super.createConfiguration();
+ skipIfNotS3ExpressBucket(conf);
+ disableFilesystemCaching(conf);
+ removeBaseAndBucketOverrides(conf,
+ CUSTOM_SIGNERS,
+ HTTP_SIGNER_ENABLED,
+ REQUEST_TIMEOUT,
+ RETRY_LIMIT,
+ S3A_BUCKET_PROBE,
+ S3EXPRESS_CREATE_SESSION,
+ SIGNING_ALGORITHM_S3
+ );
+
+ conf.setBoolean(HTTP_SIGNER_ENABLED, true);
+ conf.setClass(HTTP_SIGNER_CLASS_NAME, SlowSigner.class, HttpSigner.class);
+ Duration duration = Duration.ofMillis(10);
+
+ conf.setLong(REQUEST_TIMEOUT, duration.toMillis());
+ conf.setInt(RETRY_LIMIT, 1);
+
+ return conf;
+ }
+
+ @Override
+ public void setup() throws Exception {
+ // remove the safety check on minimum durations.
+ AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
+ try {
+ super.setup();
+ } finally {
+ // restore the safety check on minimum durations.
+ AWSClientConfig.resetMinimumOperationDuration();
+ }
+ }
+
+ @Override
+ protected void deleteTestDirInTeardown() {
+ // no-op
+ }
+
+ /**
+ * Make this a no-op to avoid IO.
+ * @param path path path
+ */
+ @Override
+ protected void mkdirs(Path path) {
+
+ }
+
+ @Test
+ public void testSlowSigningTriggersTimeout() throws Throwable {
+
+ final S3AFileSystem fs = getFileSystem();
+ DurationInfo call = new DurationInfo(LOG, true, "Create session");
+ final AWSApiCallTimeoutException thrown = intercept(AWSApiCallTimeoutException.class,
+ () -> fs.getFileStatus(path("testShortTimeout")));
+ call.finished();
+ LOG.info("Exception raised after {}", call, thrown);
+ // if the timeout took too long, fail with details and include the original
+ // exception
+ if (call.value() > TIMEOUT_EXCEPTION_THRESHOLD) {
+ throw new AssertionError("Duration of create session " + call.getDurationString()
+ + " exceeds threshold " + TIMEOUT_EXCEPTION_THRESHOLD + " ms: " + thrown, thrown);
+ }
+ Assertions.assertThat(SLEEP_INTERRUPTED.get())
+ .describedAs("Sleep interrupted during signing")
+ .isTrue();
+
+ // now scan the inner exception stack for "createSession"
+ Arrays.stream(thrown.getCause().getStackTrace())
+ .filter(e -> e.getMethodName().equals("createSession"))
+ .findFirst()
+ .orElseThrow(() ->
+ new AssertionError("No createSession() in inner stack trace of", thrown));
+ }
+
+ /**
+ * Sleep for as long as {@link #SLEEP_DURATION} requires.
+ */
+ private static void sleep() {
+ long sleep = SLEEP_DURATION.get();
+ if (sleep > 0) {
+ LOG.info("Sleeping for {} ms", sleep, new Exception());
+ try (DurationInfo d = new DurationInfo(LOG, true, "Sleep for %d ms", sleep)) {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted", e);
+ SLEEP_INTERRUPTED.set(true);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * A signer which calls {@link #sleep()} before signing.
+ * As this signing takes place within the CreateSession Pipeline,
+ */
+ public static class SlowSigner extends CustomHttpSigner {
+
+ @Override
+ public SignedRequest sign(
+ final SignRequest extends AwsCredentialsIdentity> request) {
+
+ final SdkHttpRequest httpRequest = request.request();
+ LOG.info("Signing request {}", httpRequest);
+ sleep();
+ return super.sign(request);
+ }
+
+ @Override
+ public CompletableFuture signAsync(
+ final AsyncSignRequest extends AwsCredentialsIdentity> request) {
+ sleep();
+ return super.signAsync(request);
+ }
+
+ }
+}