From 15b18889a7fd9266985d6989e2052baf8b507f53 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 30 Jan 2024 15:32:24 +0000 Subject: [PATCH] HADOOP-19045. S3A: Validate CreateSession Timeout Propagation (#6470) New test ITestCreateSessionTimeout to verify that the duration set in fs.s3a.connection.request.timeout is passed all the way down. This is done by adding a sleep() in a custom signer and verifying that it is interrupted and that an AWSApiCallTimeoutException is raised. + Fix testRequestTimeout() * doesn't skip if considered cross-region * sets a minimum duration of 0 before invocation * resets the minimum afterwards Contributed by Steve Loughran --- .../org/apache/hadoop/fs/s3a/Constants.java | 25 ++- .../hadoop/fs/s3a/auth/CustomHttpSigner.java | 2 +- .../hadoop/fs/s3a/impl/AWSClientConfig.java | 2 +- .../hadoop/fs/s3a/ITestS3AConfiguration.java | 28 ++- .../apache/hadoop/fs/s3a/S3ATestUtils.java | 10 + .../ITestCreateSessionTimeout.java | 211 ++++++++++++++++++ 6 files changed, 262 insertions(+), 16 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestCreateSessionTimeout.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 4408cf68a451e..744146ccf4f37 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -337,16 +337,33 @@ private Constants() { public static final int DEFAULT_SOCKET_TIMEOUT = (int)DEFAULT_SOCKET_TIMEOUT_DURATION.toMillis(); /** - * Time until a request is timed-out: {@value}. - * If zero, there is no timeout. + * How long should the SDK retry/wait on a response from an S3 store: {@value} + * including the time needed to sign the request. + *

+ * This is time to response, so for a GET request it is "time to 200 response" + * not the time limit to download the requested data. + * This makes it different from {@link #REQUEST_TIMEOUT}, which is for total + * HTTP request. + *

+ * Default unit is milliseconds. + *

+ * There is a minimum duration set in {@link #MINIMUM_NETWORK_OPERATION_DURATION}; + * it is impossible to set a delay less than this, even for testing. + * Why so? Too many deployments where the configuration assumed the timeout was in seconds + * and that "120" was a reasonable value rather than "too short to work reliably" + *

+ * Note for anyone writing tests which need to set a low value for this: + * to avoid the minimum duration overrides, call + * {@code AWSClientConfig.setMinimumOperationDuration()} and set a low value + * before creating the filesystem. */ public static final String REQUEST_TIMEOUT = "fs.s3a.connection.request.timeout"; /** - * Default duration of a request before it is timed out: Zero. + * Default duration of a request before it is timed out: 60s. */ - public static final Duration DEFAULT_REQUEST_TIMEOUT_DURATION = Duration.ZERO; + public static final Duration DEFAULT_REQUEST_TIMEOUT_DURATION = Duration.ofSeconds(60); /** * Default duration of a request before it is timed out: Zero. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/CustomHttpSigner.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/CustomHttpSigner.java index ba1169a5e5987..528414b63e32e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/CustomHttpSigner.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/CustomHttpSigner.java @@ -40,7 +40,7 @@ * fs.s3a.http.signer.class = org.apache.hadoop.fs.s3a.auth.CustomHttpSigner * */ -public final class CustomHttpSigner implements HttpSigner { +public class CustomHttpSigner implements HttpSigner { private static final Logger LOG = LoggerFactory .getLogger(CustomHttpSigner.class); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java index f6da9d84e0a77..60729ac30866a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java @@ -577,7 +577,7 @@ static ClientSettings createApiConnectionSettings(Configuration conf) { /** * Build the HTTP connection settings object from the configuration. - * All settings are calculated, including the api call timeout. + * All settings are calculated. * @param conf configuration to evaluate * @return connection settings. */ diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java index 8787fca431cc7..73bba9d62cbd8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java @@ -23,6 +23,7 @@ import java.net.ConnectException; import java.net.URI; import java.security.PrivilegedExceptionAction; +import java.time.Duration; import org.assertj.core.api.Assertions; import org.junit.Rule; @@ -49,6 +50,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.auth.STSClientFactory; +import org.apache.hadoop.fs.s3a.impl.AWSClientConfig; import org.apache.hadoop.fs.s3native.S3xLoginHelper; import org.apache.hadoop.security.ProviderUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -435,16 +437,22 @@ public void testCustomUserAgent() throws Exception { @Test public void testRequestTimeout() throws Exception { conf = new Configuration(); - skipIfCrossRegionClient(conf); - conf.set(REQUEST_TIMEOUT, "120"); - fs = S3ATestUtils.createTestFileSystem(conf); - S3Client s3 = getS3Client("Request timeout (ms)"); - SdkClientConfiguration clientConfiguration = getField(s3, SdkClientConfiguration.class, - "clientConfiguration"); - assertEquals("Configured " + REQUEST_TIMEOUT + - " is different than what AWS sdk configuration uses internally", - 120000, - clientConfiguration.option(SdkClientOption.API_CALL_ATTEMPT_TIMEOUT).toMillis()); + // remove the safety check on minimum durations. + AWSClientConfig.setMinimumOperationDuration(Duration.ZERO); + try { + Duration timeout = Duration.ofSeconds(120); + conf.set(REQUEST_TIMEOUT, timeout.getSeconds() + "s"); + fs = S3ATestUtils.createTestFileSystem(conf); + S3Client s3 = getS3Client("Request timeout (ms)"); + SdkClientConfiguration clientConfiguration = getField(s3, SdkClientConfiguration.class, + "clientConfiguration"); + Assertions.assertThat(clientConfiguration.option(SdkClientOption.API_CALL_ATTEMPT_TIMEOUT)) + .describedAs("Configured " + REQUEST_TIMEOUT + + " is different than what AWS sdk configuration uses internally") + .isEqualTo(timeout); + } finally { + AWSClientConfig.resetMinimumOperationDuration(); + } } @Test diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index ed1fda316dfe5..e7ea920d8a0a0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -550,6 +550,16 @@ public static void skipIfS3ExpressBucket( !isS3ExpressTestBucket(configuration)); } + /** + * Skip a test if the test bucket is not an S3Express bucket. + * @param configuration configuration to probe + */ + public static void skipIfNotS3ExpressBucket( + Configuration configuration) { + assume("Skipping test as bucket is not an S3Express bucket", + isS3ExpressTestBucket(configuration)); + } + /** * Is the test bucket an S3Express bucket? * @param conf configuration diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestCreateSessionTimeout.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestCreateSessionTimeout.java new file mode 100644 index 0000000000000..ebd771bddb3ff --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestCreateSessionTimeout.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.performance; + +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.SdkHttpRequest; +import software.amazon.awssdk.http.auth.spi.signer.AsyncSignRequest; +import software.amazon.awssdk.http.auth.spi.signer.AsyncSignedRequest; +import software.amazon.awssdk.http.auth.spi.signer.HttpSigner; +import software.amazon.awssdk.http.auth.spi.signer.SignRequest; +import software.amazon.awssdk.http.auth.spi.signer.SignedRequest; +import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.AWSApiCallTimeoutException; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.auth.CustomHttpSigner; +import org.apache.hadoop.fs.s3a.impl.AWSClientConfig; +import org.apache.hadoop.util.DurationInfo; + +import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_SIGNERS; +import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_CLASS_NAME; +import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT; +import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT; +import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE; +import static org.apache.hadoop.fs.s3a.Constants.S3EXPRESS_CREATE_SESSION; +import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_S3; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotS3ExpressBucket; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test timeout of S3 Client CreateSession call, which was originally + * hard coded to 10 seconds. + * Only executed against an S3Express store. + */ +public class ITestCreateSessionTimeout extends AbstractS3ACostTest { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestCreateSessionTimeout.class); + + /** + * What is the duration for the operation after which the test is considered + * to have failed because timeouts didn't get passed down? + */ + private static final long TIMEOUT_EXCEPTION_THRESHOLD = Duration.ofSeconds(5).toMillis(); + + /** + * How long to sleep in requests? + */ + private static final AtomicLong SLEEP_DURATION = new AtomicLong( + Duration.ofSeconds(20).toMillis()); + + /** + * Flag set if the sleep was interrupted during signing. + */ + private static final AtomicBoolean SLEEP_INTERRUPTED = new AtomicBoolean(false); + + /** + * Create a configuration with a 10 millisecond timeout on API calls + * and a custom signer which sleeps much longer than that. + * @return the configuration. + */ + @Override + public Configuration createConfiguration() { + final Configuration conf = super.createConfiguration(); + skipIfNotS3ExpressBucket(conf); + disableFilesystemCaching(conf); + removeBaseAndBucketOverrides(conf, + CUSTOM_SIGNERS, + HTTP_SIGNER_ENABLED, + REQUEST_TIMEOUT, + RETRY_LIMIT, + S3A_BUCKET_PROBE, + S3EXPRESS_CREATE_SESSION, + SIGNING_ALGORITHM_S3 + ); + + conf.setBoolean(HTTP_SIGNER_ENABLED, true); + conf.setClass(HTTP_SIGNER_CLASS_NAME, SlowSigner.class, HttpSigner.class); + Duration duration = Duration.ofMillis(10); + + conf.setLong(REQUEST_TIMEOUT, duration.toMillis()); + conf.setInt(RETRY_LIMIT, 1); + + return conf; + } + + @Override + public void setup() throws Exception { + // remove the safety check on minimum durations. + AWSClientConfig.setMinimumOperationDuration(Duration.ZERO); + try { + super.setup(); + } finally { + // restore the safety check on minimum durations. + AWSClientConfig.resetMinimumOperationDuration(); + } + } + + @Override + protected void deleteTestDirInTeardown() { + // no-op + } + + /** + * Make this a no-op to avoid IO. + * @param path path path + */ + @Override + protected void mkdirs(Path path) { + + } + + @Test + public void testSlowSigningTriggersTimeout() throws Throwable { + + final S3AFileSystem fs = getFileSystem(); + DurationInfo call = new DurationInfo(LOG, true, "Create session"); + final AWSApiCallTimeoutException thrown = intercept(AWSApiCallTimeoutException.class, + () -> fs.getFileStatus(path("testShortTimeout"))); + call.finished(); + LOG.info("Exception raised after {}", call, thrown); + // if the timeout took too long, fail with details and include the original + // exception + if (call.value() > TIMEOUT_EXCEPTION_THRESHOLD) { + throw new AssertionError("Duration of create session " + call.getDurationString() + + " exceeds threshold " + TIMEOUT_EXCEPTION_THRESHOLD + " ms: " + thrown, thrown); + } + Assertions.assertThat(SLEEP_INTERRUPTED.get()) + .describedAs("Sleep interrupted during signing") + .isTrue(); + + // now scan the inner exception stack for "createSession" + Arrays.stream(thrown.getCause().getStackTrace()) + .filter(e -> e.getMethodName().equals("createSession")) + .findFirst() + .orElseThrow(() -> + new AssertionError("No createSession() in inner stack trace of", thrown)); + } + + /** + * Sleep for as long as {@link #SLEEP_DURATION} requires. + */ + private static void sleep() { + long sleep = SLEEP_DURATION.get(); + if (sleep > 0) { + LOG.info("Sleeping for {} ms", sleep, new Exception()); + try (DurationInfo d = new DurationInfo(LOG, true, "Sleep for %d ms", sleep)) { + Thread.sleep(sleep); + } catch (InterruptedException e) { + LOG.info("Interrupted", e); + SLEEP_INTERRUPTED.set(true); + Thread.currentThread().interrupt(); + } + } + } + + /** + * A signer which calls {@link #sleep()} before signing. + * As this signing takes place within the CreateSession Pipeline, + */ + public static class SlowSigner extends CustomHttpSigner { + + @Override + public SignedRequest sign( + final SignRequest request) { + + final SdkHttpRequest httpRequest = request.request(); + LOG.info("Signing request {}", httpRequest); + sleep(); + return super.sign(request); + } + + @Override + public CompletableFuture signAsync( + final AsyncSignRequest request) { + sleep(); + return super.signAsync(request); + } + + } +}