diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/IORateLimiter.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/IORateLimiter.java new file mode 100644 index 0000000000000..0e33f3afe910f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/IORateLimiter.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.time.Duration; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * An optional interface for classes that provide rate limiters. + * For a filesystem source, the operation name SHOULD be one of + * those listed in + * {@link org.apache.hadoop.fs.statistics.StoreStatisticNames} + * if the operation is listed there. + *

+ * This interfaces is intended to be exported by FileSystems so that + * applications wishing to perform bulk operations may request access + * to a rate limiter which is shared across all threads interacting + * with the store.. + * That is: the rate limiting is global to the specific instance of the + * object implementing this interface. + *

+ * It is not expected to be shared with other instances of the same + * class, or across processes. + *

+ * This means it is primarily of benefit when limiting bulk operations + * which can overload an (object) store from a small pool of threads. + * Examples of this can include: + *

+ * In cluster applications, it is more likely that rate limiting is + * useful during job commit operations, especially in processes such + * a Spark Drivers, which may be committing work from multiple jobs over + * a short period of time. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface IORateLimiter { + + /** + * Acquire IO capacity. + *

+ * The implementation may assign different costs to the different + * operations. + *

+ * If there is not enough space, the permits will be acquired, + * but the subsequent call will block until the capacity has been + * refilled. + *

+ * The path parameter is used to support stores where there may be different throttling + * under different paths. + * @param operation operation being performed. Must not be null, may be "", + * should be from {@link org.apache.hadoop.fs.statistics.StoreStatisticNames} + * where there is a matching operation. + * @param src path under which the operations will be initiated. + * @param dest destination path for rename operations + * @param requestedCapacity capacity to acquire. + * Must be greater than or equal to 0. + * @return time spent waiting for output. + */ + Duration acquireIOCapacity(String operation, Path src, Path dest, int requestedCapacity); + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/IORateLimiterSupport.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/IORateLimiterSupport.java new file mode 100644 index 0000000000000..b9ed4c115ef4b --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/IORateLimiterSupport.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import org.apache.hadoop.fs.IORateLimiter; +import org.apache.hadoop.util.RateLimiting; +import org.apache.hadoop.util.RateLimitingFactory; + +import static java.util.Objects.requireNonNull; + +/** + * Implementation support for the IO rate limiter. + */ +public final class IORateLimiterSupport { + + private IORateLimiterSupport() { + } + + /** + * Get a rate limiter source which has no rate limiting. + * @return a rate limiter source which has no rate limiting. + */ + public static IORateLimiter unlimited() { + return (operation, src, dest, requestedCapacity) -> { + requireNonNull(operation, "operation"); + return RateLimitingFactory.unlimitedRate().acquire(requestedCapacity); + }; + } + + /** + * Create a rate limiter with a fixed capacity. + * @param capacityPerSecond capacity per second. + * @return a rate limiter. + */ + public static IORateLimiter create(int capacityPerSecond) { + final RateLimiting limiting = RateLimitingFactory.create(capacityPerSecond); + return (operation, src, dest, requestedCapacity) -> { + requireNonNull(operation, "operation"); + return limiting.acquire(requestedCapacity); + }; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java index 19ee9d1414ecf..1970dbb68d0fd 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java @@ -63,6 +63,9 @@ public final class StoreStatisticNames { /** {@value}. */ public static final String OP_DELETE = "op_delete"; + /** {@value}. */ + public static final String OP_DELETE_DIR = "op_delete_dir"; + /** {@value}. */ public static final String OP_EXISTS = "op_exists"; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimiting.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimiting.java index ae119c0e630f4..367e236dac8b7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimiting.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimiting.java @@ -28,8 +28,10 @@ * Can be used to throttle use of object stores where excess load * will trigger cluster-wide throttling, backoff etc. and so collapse * performance. + *

* The time waited is returned as a Duration type. - * The google rate limiter implements this by allowing a caller to ask for + *

+ * The google rate limiter implements rate limiting by allowing a caller to ask for * more capacity than is available. This will be granted * but the subsequent request will be blocked if the bucket of * capacity hasn't let refilled to the point where there is @@ -44,8 +46,11 @@ public interface RateLimiting { * If there is not enough space, the permits will be acquired, * but the subsequent call will block until the capacity has been * refilled. + *

+ * If the capacity is zero, no delay will take place. * @param requestedCapacity capacity to acquire. - * @return time spent waiting for output. + * Must be greater than or equal to 0. + * @return time spent waiting to acquire the capacity.. */ Duration acquire(int requestedCapacity); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimitingFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimitingFactory.java index 621415456e125..fb5c45f0e0305 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimitingFactory.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimitingFactory.java @@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter; +import static org.apache.hadoop.util.Preconditions.checkArgument; + /** * Factory for Rate Limiting. * This should be only place in the code where the guava RateLimiter is imported. @@ -50,6 +52,7 @@ private static class NoRateLimiting implements RateLimiting { @Override public Duration acquire(int requestedCapacity) { + checkArgument(requestedCapacity >= 0, "requestedCapacity must be >= 0"); return INSTANTLY; } } @@ -70,6 +73,11 @@ private RestrictedRateLimiting(int capacityPerSecond) { @Override public Duration acquire(int requestedCapacity) { + checkArgument(requestedCapacity >= 0, "requestedCapacity must be >= 0"); + if (requestedCapacity == 0) { + // google limiter does not do this. + return INSTANTLY; + } final double delayMillis = limiter.acquire(requestedCapacity); return delayMillis == 0 ? INSTANTLY diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java index c3fda19d8d73b..0dff801c92351 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.io.UncheckedIOException; +import java.time.Duration; +import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -29,6 +31,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -55,6 +60,9 @@ @InterfaceStability.Unstable public final class FutureIO { + private static final Logger LOG = + LoggerFactory.getLogger(FutureIO.class); + private FutureIO() { } @@ -275,4 +283,44 @@ public static CompletableFuture eval( } return result; } + + /** + * Wait for all the futures to complete; the caller should pass down + * small sleep interval between each iteration; enough to yield the CPU. + * @param futures futures. + * @param sleepInterval Interval in milliseconds to await completion. + */ + public static void awaitAllFutures(final Collection> futures, + final Duration sleepInterval) { + int size = futures.size(); + LOG.debug("Waiting for {} tasks to complete", size); + if (size == 0) { + // shortcut for empty list. + return; + } + int oldNumFinished = 0; + while (true) { + int numFinished = (int) futures.stream() + .filter(Future::isDone) + .count(); + + if (oldNumFinished != numFinished) { + LOG.debug("Finished count -> {}/{}", numFinished, size); + oldNumFinished = numFinished; + } + + if (numFinished == size) { + // all of the futures are done, stop looping + break; + } else { + try { + Thread.sleep(sleepInterval.toMillis()); + } catch (InterruptedException e) { + futures.forEach(future -> future.cancel(true)); + Thread.currentThread().interrupt(); + break; + } + } + } + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/TaskPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/TaskPool.java index c9e6d0b78ac11..2c04fa5574496 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/TaskPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/TaskPool.java @@ -19,6 +19,7 @@ package org.apache.hadoop.util.functional; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -40,6 +41,7 @@ import org.apache.hadoop.fs.statistics.IOStatisticsContext; import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.util.functional.FutureIO.awaitAllFutures; import static org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFromIterable; /** @@ -466,7 +468,7 @@ private boolean runParallel(final Task task) taskFailed.set(true); } // let the above tasks complete (or abort) - waitFor(futures, sleepInterval); + awaitAllFutures(futures, Duration.ofMillis(sleepInterval)); int futureCount = futures.size(); futures.clear(); @@ -498,7 +500,7 @@ private boolean runParallel(final Task task) } // let the revert tasks complete - waitFor(futures, sleepInterval); + awaitAllFutures(futures, Duration.ofMillis(sleepInterval)); } // give priority to execution exceptions over @@ -539,39 +541,6 @@ private void resetStatisticsContext() { } } - /** - * Wait for all the futures to complete; there's a small sleep between - * each iteration; enough to yield the CPU. - * @param futures futures. - * @param sleepInterval Interval in milliseconds to await completion. - */ - private static void waitFor(Collection> futures, int sleepInterval) { - int size = futures.size(); - LOG.debug("Waiting for {} tasks to complete", size); - int oldNumFinished = 0; - while (true) { - int numFinished = (int) futures.stream().filter(Future::isDone).count(); - - if (oldNumFinished != numFinished) { - LOG.debug("Finished count -> {}/{}", numFinished, size); - oldNumFinished = numFinished; - } - - if (numFinished == size) { - // all of the futures are done, stop looping - break; - } else { - try { - Thread.sleep(sleepInterval); - } catch (InterruptedException e) { - futures.forEach(future -> future.cancel(true)); - Thread.currentThread().interrupt(); - break; - } - } - } - } - /** * Create a task builder for the iterable. * @param items item source. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestIORateLimiter.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestIORateLimiter.java new file mode 100644 index 0000000000000..6de11eef06533 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestIORateLimiter.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.time.Duration; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.impl.IORateLimiterSupport; +import org.apache.hadoop.test.AbstractHadoopTestBase; +import org.apache.hadoop.util.RateLimiting; +import org.apache.hadoop.util.RateLimitingFactory; + +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test IO rate limiting. + * This includes: illegal arguments, and what if more capacity + * is requested than is available. + */ +public class TestIORateLimiter extends AbstractHadoopTestBase { + + private static final Logger LOG = LoggerFactory.getLogger( + TestIORateLimiter.class); + + @Test + public void testAcquireIOCapacity() { + final int size = 10; + final RateLimiting limiter = RateLimitingFactory.create(size); + // do a chain of requests + limiter.acquire(0); + limiter.acquire(1); + limiter.acquire(2); + + // now ask for more than is allowed. This MUST work. + final int excess = size * 2; + limiter.acquire(excess); + assertDelayed(limiter, excess); + } + + @Test + public void testNegativeCapacityRejected() throws Throwable { + final RateLimiting limiter = RateLimitingFactory.create(1); + intercept(IllegalArgumentException.class, () -> + limiter.acquire(-1)); + } + + @Test + public void testNegativeLimiterCapacityRejected() throws Throwable { + intercept(IllegalArgumentException.class, () -> + RateLimitingFactory.create(-1)); + } + + /** + * This is a key behavior: it is acceptable to ask for more capacity + * than the caller has, the initial request must be granted, + * but the followup request must be delayed until enough capacity + * has been restored. + */ + @Test + public void testAcquireExcessIOCapacity() { + final int size = 10; + final RateLimiting limiter = RateLimitingFactory.create(size); + + // now ask for more than is allowed. This MUST work. + final int excess = size * 2; + // first attempt gets more capacity than arrives every second. + assertNotDelayed(limiter, excess); + // second attempt will block + assertDelayed(limiter, excess); + // third attempt will block + assertDelayed(limiter, size); + // as these are short-cut, no delays. + assertNotDelayed(limiter, 0); + } + + @Test + public void testLimitedCapacity() { + final int size = 10; + final IORateLimiter limiter = IORateLimiterSupport.create(size); + final int excess = size * 2; + // first attempt gets more capacity than arrives every second. + assertNotDelayed(limiter, "", excess); + // second attempt will block + assertDelayed(limiter, "", excess); + // third attempt will block + assertDelayed(limiter, "", size); + // as these are short-cut, no delays. + assertNotDelayed(limiter, "", 0); + } + + @Test + public void testUnlimitedRejectsNegativeCapacity() throws Exception { + intercept(IllegalArgumentException.class, () -> + IORateLimiterSupport.unlimited().acquireIOCapacity("", new Path("/"), null, -1)); + } + + @Test + public void testUnlimitedRejectsNullOperation() throws Exception { + intercept(NullPointerException.class, () -> + IORateLimiterSupport.unlimited().acquireIOCapacity(null, new Path("/"), null, 0)); + } + + /** + * Assert that a request for a given capacity is delayed. + * There's no assertion on the duration, only that it is greater than 0. + * @param limiter limiter + * @param capacity capacity + */ + private static void assertNotDelayed(final RateLimiting limiter, final int capacity) { + assertZeroDuration(capacity, limiter.acquire(capacity)); + } + + /** + * Assert that a request for a given capacity is delayed. + * There's no assertion on the duration, only that it is greater than 0. + * @param limiter limiter + * @param capacity capacity + */ + private static void assertDelayed(final RateLimiting limiter, final int capacity) { + assertNonZeroDuration(capacity, limiter.acquire(capacity)); + } + + /** + * Assert that a request for a given capacity is not delayed. + * @param limiter limiter + * @param op operation + * @param capacity capacity + */ + private static void assertNotDelayed(IORateLimiter limiter, String op, int capacity) { + assertZeroDuration(capacity, limiter.acquireIOCapacity(op, new Path("/"), null, capacity)); + } + + /** + * Assert that a request for a given capacity is delayed. + * There's no assertion on the duration, only that it is greater than 0. + * @param limiter limiter + * @param op operation + * @param capacity capacity + */ + private static void assertDelayed(IORateLimiter limiter, String op, int capacity) { + assertNonZeroDuration(capacity, limiter.acquireIOCapacity(op, new Path("/"), null, capacity)); + } + + /** + * Assert that duration was not zero. + * @param capacity capacity requested + * @param duration duration + */ + private static void assertNonZeroDuration(final int capacity, final Duration duration) { + LOG.info("Delay for {} capacity: {}", capacity, duration); + Assertions.assertThat(duration) + .describedAs("delay for %d capacity", capacity) + .isGreaterThan(Duration.ZERO); + } + + /** + * Assert that duration was zero. + * @param capacity capacity requested + * @param duration duration + */ + private static void assertZeroDuration(final int capacity, final Duration duration) { + Assertions.assertThat(duration) + .describedAs("delay for %d capacity", capacity) + .isEqualTo(Duration.ZERO); + } +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConfig.java index 8a1ae0fcc9810..4b0acf08c0230 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConfig.java @@ -41,6 +41,7 @@ import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.*; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.DELETE_DIR_CAPACITY; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.buildJobUUID; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.getAppAttemptId; @@ -153,6 +154,11 @@ public final class ManifestCommitterConfig implements IOStatisticsSource { */ private final int writerQueueCapacity; + /** + * Capacity for directory delete operations. + */ + private int deleteDirCapacity = DELETE_DIR_CAPACITY; + /** * Constructor. * @param outputPath destination path of the job. @@ -198,6 +204,9 @@ public final class ManifestCommitterConfig implements IOStatisticsSource { this.writerQueueCapacity = conf.getInt( OPT_WRITER_QUEUE_CAPACITY, DEFAULT_WRITER_QUEUE_CAPACITY); + this.deleteDirCapacity = conf.getInt( + OPT_DELETE_DIR_CAPACITY, + DELETE_DIR_CAPACITY); // if constructed with a task attempt, build the task ID and path. if (context instanceof TaskAttemptContext) { @@ -272,7 +281,9 @@ StageConfig createStageConfig() { .withTaskAttemptDir(taskAttemptDir) .withTaskAttemptId(taskAttemptId) .withTaskId(taskId) - .withWriterQueueCapacity(writerQueueCapacity); + .withWriterQueueCapacity(writerQueueCapacity) + .withDeleteDirCapacity(deleteDirCapacity); + return stageConfig; } @@ -340,6 +351,10 @@ public int getWriterQueueCapacity() { return writerQueueCapacity; } + public int getDeleteDirCapacity() { + return deleteDirCapacity; + } + @Override public IOStatisticsStore getIOStatistics() { return iostatistics; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java index dc5ccb2e1df3a..01db45214e708 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java @@ -143,6 +143,13 @@ public final class ManifestCommitterConstants { */ public static final boolean OPT_CLEANUP_PARALLEL_DELETE_DIRS_DEFAULT = true; + /** + * How much write capacity to request when deleting any directory + * Value: {@value}. + */ + public static final String OPT_DELETE_DIR_CAPACITY = + OPT_PREFIX + "delete.dir.capacity"; + /** * Threads to use for IO. */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/InternalConstants.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/InternalConstants.java index 15f9899f3551e..fe19db4fdd1c3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/InternalConstants.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/InternalConstants.java @@ -40,6 +40,7 @@ */ @InterfaceAudience.Private public final class InternalConstants { + private InternalConstants() { } @@ -127,4 +128,45 @@ private InternalConstants() { /** Schemas of filesystems we know to not work with this committer. */ public static final Set UNSUPPORTED_FS_SCHEMAS = ImmutableSet.of("s3a", "wasb"); + + /** + * How many attempts to commit the task by save and rename + * before giving up: {@value}. + */ + public static final int TASK_COMMIT_RETRY_COUNT = 3; + + /** + * Capacity for getFileStatus and similar HEAD requests: {@value}. + */ + public static final int GET_FILE_STATUS_CAPACITY = 1; + + /** + * Capacity for deleting a single file: {@value}. + */ + public static final int DELETE_FILE_CAPACITY = 1; + + /** + * Capacity for deleting a directory. + * This is considered more expensive, and may be configured. + * Value: {@value}. + */ + public static final int DELETE_DIR_CAPACITY = 100; + + /** + * Capacity for listing: {@value}. + */ + public static final int LIST_CAPACITY = 1; + + /** + * Capacity for mkdirs: {@value}. + */ + public static final int MKDIRS_CAPACITY = 1; + + /** + * Capacity for rename, includes optional LIST request; + * does not worry about total depth or file count. + * Value: {@value}. + */ + public static final int RENAME_CAPACITY = 1; + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperations.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperations.java index b81fa9dd32add..7d95d0cfe7ac8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperations.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperations.java @@ -28,8 +28,10 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.IORateLimiter; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.impl.IORateLimiterSupport; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest; @@ -45,7 +47,7 @@ */ @InterfaceAudience.LimitedPrivate("mapreduce, object-stores") @InterfaceStability.Unstable -public abstract class ManifestStoreOperations implements Closeable { +public abstract class ManifestStoreOperations implements Closeable, IORateLimiter { /** * Bind to the filesystem. @@ -97,6 +99,35 @@ public boolean isFile(Path path) throws IOException { public abstract boolean delete(Path path, boolean recursive) throws IOException; + /** + * Forward to {@code delete(Path, true)} + * unless overridden. + *

+ * If it returns without an error: there is nothing at + * the end of the path. + * @param path path + * @return outcome + * @throws IOException failure. + */ + public boolean deleteFile(Path path) + throws IOException { + return delete(path, true); + } + + /** + * Acquire the delete capacity then call {@code FileSystem#delete(Path, true)} + * or equivalent. + *

+ * If it returns without an error: there is nothing at + * the end of the path. + * @param path path + * @param capacity IO capacity to ask for. + * @return outcome + * @throws IOException failure. + */ + public abstract boolean rmdir(Path path, int capacity) + throws IOException; + /** * Forward to {@link FileSystem#mkdirs(Path)}. * Usual "what does 'false' mean" ambiguity. @@ -288,4 +319,22 @@ public Duration getWaitTime() { } } + /** + * Get the rate limiter source. + * Shall never be null; may be unlimited. + * @return the rate limiter source. + */ + public IORateLimiter rateLimiterSource() { + return IORateLimiterSupport.unlimited(); + } + + /** + * Delegate to {@link #rateLimiterSource()}. + * {@inheritDoc} + */ + @Override + public Duration acquireIOCapacity(final String operation, final Path src, final Path dest, final int requestedCapacity) { + return rateLimiterSource().acquireIOCapacity(operation, new Path("/"), null, requestedCapacity); + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperationsThroughFileSystem.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperationsThroughFileSystem.java index 9a0b972bc735b..ec5023c484042 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperationsThroughFileSystem.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperationsThroughFileSystem.java @@ -25,12 +25,21 @@ import org.apache.hadoop.fs.CommonPathCapabilities; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.IORateLimiter; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest; import org.apache.hadoop.util.JsonSerialization; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.*; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.DELETE_DIR_CAPACITY; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.DELETE_FILE_CAPACITY; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.GET_FILE_STATUS_CAPACITY; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.LIST_CAPACITY; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.MKDIRS_CAPACITY; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.RENAME_CAPACITY; + /** * Implementation of manifest store operations through the filesystem API. * This class is subclassed in the ABFS module, which does add the resilient @@ -87,6 +96,7 @@ public void bindToFileSystem(FileSystem filesystem, Path path) throws IOExceptio @Override public FileStatus getFileStatus(Path path) throws IOException { + acquireIOCapacity(OP_GET_FILE_STATUS, new Path("/"), null, GET_FILE_STATUS_CAPACITY); return fileSystem.getFileStatus(path); } @@ -99,30 +109,47 @@ public FileStatus getFileStatus(Path path) throws IOException { @SuppressWarnings("deprecation") @Override public boolean isFile(Path path) throws IOException { + acquireIOCapacity(OP_IS_FILE, new Path("/"), null, GET_FILE_STATUS_CAPACITY); return fileSystem.isFile(path); } + /** + * Delete a path. + * The capacity to acquire is based on the recursive flag. + * {@inheritDoc} + */ @Override public boolean delete(Path path, boolean recursive) throws IOException { + acquireIOCapacity(OP_DELETE, + new Path("/"), null, recursive ? DELETE_FILE_CAPACITY : DELETE_DIR_CAPACITY); return fileSystem.delete(path, recursive); } + @Override + public boolean rmdir(final Path path, final int capacity) throws IOException { + acquireIOCapacity(OP_DELETE_DIR, new Path("/"), null, capacity); + return fileSystem.delete(path, true); + } + @Override public boolean mkdirs(Path path) throws IOException { + acquireIOCapacity(OP_MKDIRS, new Path("/"), null, MKDIRS_CAPACITY); return fileSystem.mkdirs(path); } @Override public boolean renameFile(Path source, Path dest) throws IOException { + acquireIOCapacity(OP_RENAME, new Path("/"), null, RENAME_CAPACITY); return fileSystem.rename(source, dest); } @Override public RemoteIterator listStatusIterator(Path path) throws IOException { + acquireIOCapacity(OP_LIST_STATUS, new Path("/"), null, LIST_CAPACITY); return fileSystem.listStatusIterator(path); } @@ -130,6 +157,7 @@ public RemoteIterator listStatusIterator(Path path) public TaskManifest loadTaskManifest( JsonSerialization serializer, FileStatus st) throws IOException { + acquireIOCapacity(OP_OPENFILE, new Path("/"), null, 1); return TaskManifest.load(serializer, fileSystem, st.getPath(), st); } @@ -138,6 +166,7 @@ public > void save( final T manifestData, final Path path, final boolean overwrite) throws IOException { + acquireIOCapacity(OP_CREATE, new Path("/"), null, 1); manifestData.save(fileSystem, path, overwrite); } @@ -175,6 +204,7 @@ public void msync(Path path) throws IOException { // qualify so we can be confident that the FS being synced // is the one we expect. fileSystem.makeQualified(path); + acquireIOCapacity(OP_MSYNC, new Path("/"), null, 1); try { fileSystem.msync(); } catch (UnsupportedOperationException ignored) { @@ -184,4 +214,16 @@ public void msync(Path path) throws IOException { } } + /** + * If the FS is a rate limiter source, return it, + * else the superclass (unlimited) implementation. + * @return a rate limiter source. + */ + @Override + public IORateLimiter rateLimiterSource() { + final FileSystem fs = getFileSystem(); + return fs instanceof IORateLimiter + ? (IORateLimiter) fs + : super.rateLimiterSource(); + } } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbortTaskStage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbortTaskStage.java index c2b44c2a924fd..9ff47c97c5500 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbortTaskStage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbortTaskStage.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE_DIR; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_TASK_ABORT_TASK; /** @@ -55,7 +56,7 @@ protected Path executeStage(final Boolean suppressExceptions) final Path dir = getTaskAttemptDir(); if (dir != null) { LOG.info("{}: Deleting task attempt directory {}", getName(), dir); - deleteDir(dir, suppressExceptions); + deleteDirSuppressingExceptions(dir, OP_DELETE_DIR); } return dir; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java index 161153c82faac..e9ea017bbb6f8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java @@ -45,6 +45,7 @@ import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE_DIR; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_GET_FILE_STATUS; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_IS_FILE; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_LIST_STATUS; @@ -299,12 +300,12 @@ public void addExecutionDurationToStatistics(IOStatisticsStore iostats, /** * Note any rate limiting to the given timing statistic. - * If the wait was 0, no statistics are updated. + * If the wait was null/0, no statistics are updated. * @param statistic statistic key. * @param wait wait duration. */ - private void noteAnyRateLimiting(String statistic, Duration wait) { - if (!wait.isZero()) { + protected void noteAnyRateLimiting(String statistic, Duration wait) { + if (wait != null && !wait.isZero()) { // rate limiting took place getIOStatistics().addTimedOperation( statistic, @@ -412,9 +413,8 @@ protected final boolean isFile( final Path path) throws IOException { LOG.trace("{}: isFile('{}')", getName(), path); - return trackDuration(getIOStatistics(), OP_IS_FILE, () -> { - return operations.isFile(path); - }); + return trackDuration(getIOStatistics(), OP_IS_FILE, () -> + operations.isFile(path)); } /** @@ -445,9 +445,29 @@ protected Boolean delete( final boolean recursive, final String statistic) throws IOException { - return trackDuration(getIOStatistics(), statistic, () -> { - return operations.delete(path, recursive); - }); + if (recursive) { + return deleteDir(path, statistic); + } else { + return deleteFile(path, statistic); + } + } + + /** + * Delete a file at a path. + *

+ * If it returns without an error: there is nothing at + * the end of the path. + * @param path path + * @param statistic statistic to update + * @return outcome. + * @throws IOException IO Failure. + */ + protected boolean deleteFile( + final Path path, + final String statistic) + throws IOException { + return trackDuration(getIOStatistics(), statistic, () -> + operations.deleteFile(path)); } /** @@ -690,6 +710,8 @@ protected boolean storeSupportsResilientCommit() { * Maybe delete the destination. * This routine is optimized for the data not existing, as HEAD seems to cost less * than a DELETE; assuming most calls don't have data, this is faster. + * If the destination exists, {@link #deleteDir(Path, String)} is invoked + * so as to require more IO capacity. * @param deleteDest should an attempt to delete the dest be made? * @param dest destination path * @throws IOException IO failure, including permissions. @@ -697,11 +719,14 @@ protected boolean storeSupportsResilientCommit() { private void maybeDeleteDest(final boolean deleteDest, final Path dest) throws IOException { if (deleteDest && getFileStatusOrNull(dest) != null) { - - boolean deleted = delete(dest, true); - // log the outcome in case of emergency diagnostics traces - // being needed. - LOG.debug("{}: delete('{}') returned {}'", getName(), dest, deleted); + final FileStatus st = getFileStatusOrNull(dest); + if (st != null) { + if (st.isDirectory()) { + deleteDir(dest, OP_DELETE_DIR); + } else { + deleteFile(dest, OP_DELETE); + } + } } } @@ -915,26 +940,36 @@ protected final TaskPool.Submitter getIOProcessors(int size) { } /** - * Delete a directory, possibly suppressing exceptions. + * Delete a directory. * @param dir directory. - * @param suppressExceptions should exceptions be suppressed? + * @param statistic statistic to use + * @return true if the path is no longer present. * @throws IOException exceptions raised in delete if not suppressed. - * @return any exception caught and suppressed */ - protected IOException deleteDir( + protected boolean deleteDir( final Path dir, - final Boolean suppressExceptions) + final String statistic) + throws IOException { + return trackDuration(getIOStatistics(), statistic, () -> + operations.rmdir(dir, stageConfig.getDeleteDirCapacity())); + } + + /** + * Delete a directory, suprressing exceptions. + * @param dir directory. + * @param statistic statistic to use + * @return any exception caught. + */ + protected IOException deleteDirSuppressingExceptions( + final Path dir, + final String statistic) throws IOException { try { - delete(dir, true); + deleteDir(dir, statistic); return null; } catch (IOException ex) { LOG.info("Error deleting {}: {}", dir, ex.toString()); - if (!suppressExceptions) { - throw ex; - } else { - return ex; - } + return ex; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java index 77b80aaf67fd6..b36027c99055d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java @@ -35,6 +35,7 @@ import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE_DIR; import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED; import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT; import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED; @@ -49,7 +50,7 @@ * Returns: the outcome of the overall operation * The result is detailed purely for the benefit of tests, which need * to make assertions about error handling and fallbacks. - * + *

* There's a few known issues with the azure and GCS stores which * this stage tries to address. * - Google GCS directory deletion is O(entries), so is slower for big jobs. @@ -57,19 +58,21 @@ * when not the store owner triggers a scan down the tree to verify the * caller has the permission to delete each subdir. * If this scan takes over 90s, the operation can time out. - * + * - Azure storage requires IO capacity based on the number of subdirectories. + *

* The main solution for both of these is that task attempts are * deleted in parallel, in different threads. * This will speed up GCS cleanup and reduce the risk of * abfs related timeouts. + *

* Exceptions during cleanup can be suppressed, * so that these do not cause the job to fail. - * + *

* Also, some users want to be able to run multiple independent jobs * targeting the same output directory simultaneously. * If one job deletes the directory `__temporary` all the others * will fail. - * + *

* This can be addressed by disabling cleanup entirely. * */ @@ -219,7 +222,7 @@ protected Result executeStage( * Delete a single TA dir in a parallel task. * Updates the audit context. * Exceptions are swallowed so that attempts are still made - * to delete the others, but the first exception + * to delete the others, but one of any exceptions raised. * caught is saved in a field which can be retrieved * via {@link #getLastDeleteException()}. * @@ -246,7 +249,7 @@ private IOException deleteOneDir(final Path dir) throws IOException { deleteDirCount.incrementAndGet(); - IOException ex = deleteDir(dir, true); + final IOException ex = deleteDirSuppressingExceptions(dir, OP_DELETE_DIR); if (ex != null) { deleteFailure(ex); } @@ -258,8 +261,9 @@ private IOException deleteOneDir(final Path dir) * @param ex exception */ private synchronized void deleteFailure(IOException ex) { - // excaption: add the count + // exception: add the count deleteFailureCount.incrementAndGet(); + // and save the exception, overwriting any predecessor. lastDeleteException = ex; } @@ -343,8 +347,7 @@ public String toString() { public static final Arguments DISABLED = new Arguments(OP_STAGE_JOB_CLEANUP, false, false, - false - ); + false); /** * Build an options argument from a configuration, using the diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CreateOutputDirectoriesStage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CreateOutputDirectoriesStage.java index 1618cf591a590..d3c66ee1d70a0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CreateOutputDirectoriesStage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CreateOutputDirectoriesStage.java @@ -237,7 +237,6 @@ private void deleteDirWithFile(Path dir) throws IOException { addToDirectoryMap(dir, DirMapState.fileNowDeleted); } - /** * Create a directory is required, updating the directory map * and, if the operation took place, the list of created dirs. @@ -323,7 +322,7 @@ private DirMapState maybeCreateOneDirectory(DirEntry dirEntry) throws IOExceptio // is bad: delete a file LOG.info("{}: Deleting file where a directory should go: {}", getName(), st); - delete(path, false, OP_DELETE_FILE_UNDER_DESTINATION); + deleteFile(path, OP_DELETE_FILE_UNDER_DESTINATION); } else { // is good. LOG.warn("{}: Even though mkdirs({}) failed, there is now a directory there", diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/SaveTaskManifestStage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/SaveTaskManifestStage.java index fdaf0184cda20..4a17468c068e4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/SaveTaskManifestStage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/SaveTaskManifestStage.java @@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_TASK_SAVE_MANIFEST; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.TASK_COMMIT_RETRY_COUNT; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.manifestPathForTask; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.manifestTempPathForTaskAttempt; @@ -38,12 +39,32 @@ * Uses both the task ID and task attempt ID to determine the temp filename; * Before the rename of (temp, final-path), any file at the final path * is deleted. + *

* This is so that when this stage is invoked in a task commit, its output * overwrites any of the first commit. * When it succeeds, therefore, unless there is any subsequent commit of * another task, the task manifest at the final path is from this * operation. - * + *

+ * If the save and rename fails, there are a limited number of retries, with no sleep + * interval. + * This is to briefly try recover from any transient rename() failure, including a + * race condition with any other task commit. + *

    + *
  1. If the previous task commit has already succeeded, this rename will overwrite it. + * Both task attempts will report success.
  2. + *
  3. If after, writing, another task attempt overwrites it, again, both + * task attempts will report success.
  4. + *
  5. If another task commits between the delete() and rename() operations, the retry will + * attempt to recover by repeating the manifest write, and then report success.
  6. + *
+ * This means that multiple task attempts may report success, but only one will have it actual + * manifest saved. + * The mapreduce and spark committers only schedule a second task commit attempt if the first + * task attempt's commit operation fails or fails to report success in the allocated time. + * The overwrite with retry loop is an attempt to ensure that the second attempt will report + * success, if a partitioned cluster means that the original TA commit is still in progress. + *

* Returns the path where the manifest was saved. */ public class SaveTaskManifestStage extends @@ -73,8 +94,21 @@ protected Path executeStage(final TaskManifest manifest) getRequiredTaskId()); Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir, getRequiredTaskAttemptId()); - LOG.info("{}: Saving manifest file to {}", getName(), manifestFile); - save(manifest, manifestTempFile, manifestFile); + int limit = TASK_COMMIT_RETRY_COUNT; + boolean success = false; + do { + try { + LOG.info("{}: Saving manifest file to {}", getName(), manifestFile); + save(manifest, manifestTempFile, manifestFile); + success = true; + } catch (IOException e) { + LOG.warn("Failed to save manifest to {} via temp file {} and rename()", + manifestFile, manifestTempFile, e); + if (--limit < 0) { + throw e; + } + } + } while (!success); return manifestFile; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/StageConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/StageConfig.java index b716d2f4b7f0c..3d77854905158 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/StageConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/StageConfig.java @@ -32,6 +32,7 @@ import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.DEFAULT_WRITER_QUEUE_CAPACITY; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER_FILE_LIMIT; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.DELETE_DIR_CAPACITY; /** * Stage Config. @@ -172,6 +173,11 @@ public class StageConfig { */ private int successMarkerFileLimit = SUCCESS_MARKER_FILE_LIMIT; + /** + * Capacity for directory delete operations. + */ + private int deleteDirCapacity = DELETE_DIR_CAPACITY; + public StageConfig() { } @@ -604,6 +610,24 @@ public int getSuccessMarkerFileLimit() { return successMarkerFileLimit; } + /** + * Get the capacity for delete operations. + * @return the capacity + */ + public int getDeleteDirCapacity() { + return deleteDirCapacity; + } + + /** + * Set builder value. + * @param value new value + * @return the builder + */ + public StageConfig withDeleteDirCapacity(final int value) { + deleteDirCapacity = value; + return this; + } + /** * Enter the stage; calls back to * {@link #enterStageEventHandler} if non-null. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md index da199a48d14c0..c63552f4c20a5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md @@ -17,7 +17,7 @@ This document how to use the _Manifest Committer_. -The _Manifest_ committer is a committer for work which provides +The _Manifest Committer_ is a committer for work which provides performance on ABFS for "real world" queries, and performance and correctness on GCS. It also works with other filesystems, including HDFS. @@ -523,14 +523,14 @@ And optional settings for debugging/performance analysis ``` spark.hadoop.mapreduce.outputcommitter.factory.scheme.abfs org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory -spark.hadoop.fs.azure.io.rate.limit 10000 +spark.hadoop.fs.azure.io.rate.limit 1000 spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol spark.hadoop.mapreduce.manifest.committer.summary.report.directory (optional: URI of a directory for job summaries) ``` -## Experimental: ABFS Rename Rate Limiting `fs.azure.io.rate.limit` +## ABFS Rename Rate Limiting `fs.azure.io.rate.limit` To avoid triggering store throttling and backoff delays, as well as other throttling-related failure conditions file renames during job commit @@ -538,19 +538,23 @@ are throttled through a "rate limiter" which limits the number of rename operations per second a single instance of the ABFS FileSystem client may issue. -| Option | Meaning | -|--------|---------| -| `fs.azure.io.rate.limit` | Rate limit in operations/second for IO operations. | +| Option | Meaning | +|----------------------------------------------------|----------------------------------------------------| +| `fs.azure.io.rate.limit` | Rate limit in operations/second for IO operations. | +| `mapreduce.manifest.committer.delete.dir.capacity` | Write capacity to request for directory deletion. | +### Option `fs.azure.io.rate.limit` + +This is the number of IOPS to allocate for reading and writing during task and job commit. Set the option to `0` remove all rate limiting. -The default value of this is set to 10000, which is the default IO capacity for -an ADLS storage account. +The default value of this is set to 1000. + ```xml fs.azure.io.rate.limit - 10000 + 1000 maximum number of renames attempted per second ``` @@ -562,26 +566,53 @@ alone other applications sharing the same storage account. It will be shared with all jobs being committed by the same Spark driver, as these do share that filesystem connector. +### Option `mapreduce.manifest.committer.delete.dir.capacity` + +This option controls the amount of IO capacity requested for directory cleanup, abort, and any other +directory deletion. + +When deleting directory trees on Azure storage, the number of write operations +required is proportional to the number of subdirectories in the tree; +the deeper and wider the tree is, the more IOPS it consumes. + +The option `mapreduce.manifest.committer.delete.dir.capacity` +set the capacity to be asked by every directory deletion operation, such as during cleanup +and abort operations. + +When deleting task attempt directories in parallel (the default), each task attempt +directory deletion will ask for this capacity. That is: the capacity is not shared +across multiple task attempts, although the total store capacity, set by `fs.azure.io.rate.limit` +is. + +### Observing rate limiting + If rate limiting is imposed, the statistic `store_io_rate_limited` will report the time to acquire permits for committing files. If server-side throttling took place, signs of this can be seen in * The store service's logs and their throttling status codes (usually 503 or 500). -* The job statistic `commit_file_rename_recovered`. This statistic indicates that - ADLS throttling manifested as failures in renames, failures which were recovered - from in the comitter. +* The job statistic `commit_file_rename_recovered` in the `_SUCCESS` file. + This statistic indicates that ADLS throttling manifested as failures in renames, + failures which were recovered from in the committer. If these are seen -or other applications running at the same time experience throttling/throttling-triggered problems, consider reducing the value of `fs.azure.io.rate.limit`, and/or requesting a higher IO capacity from Microsoft. +All operations which make filesystem operations will invoke the rate limiter +to acquire read or write capacity. + +Consider Reducing the value of `fs.azure.io.rate.limit` if: +* Other operations in the cluster fail while spark queries are committing work. +* The job commit/task commit operations are failing intermittently and load + is hypothesised. Note: the committer is intended to be resilient to this. +* The storage account has reduced IO capacity. + *Important* if you do get extra capacity from Microsoft and you want to use it to speed up job commits, increase the value of `fs.azure.io.rate.limit` either across the cluster, or specifically for those jobs which you wish to allocate extra priority to. -This is still a work in progress; it may be expanded to support -all IO operations performed by a single filesystem instance. # Working with Google Cloud Storage diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/AbstractManifestCommitterTest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/AbstractManifestCommitterTest.java index 5b64d544bc551..747d5444cbb36 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/AbstractManifestCommitterTest.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/AbstractManifestCommitterTest.java @@ -1013,8 +1013,12 @@ protected CleanupJobStage.Result cleanup( final int expectedDirsDeleted) throws IOException { StageConfig stageConfig = getJobStageConfig(); CleanupJobStage.Result result = new CleanupJobStage(stageConfig) - .apply(new CleanupJobStage.Arguments(OP_STAGE_JOB_CLEANUP, - enabled, deleteTaskAttemptDirsInParallel, suppressExceptions)); + .apply(new CleanupJobStage.Arguments( + OP_STAGE_JOB_CLEANUP, + enabled, + deleteTaskAttemptDirsInParallel, + suppressExceptions + )); assertCleanupResult(result, outcome, expectedDirsDeleted); return result; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/StubStoreOperations.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/StubStoreOperations.java index d9269767f0761..310ec8c9602c5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/StubStoreOperations.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/StubStoreOperations.java @@ -48,6 +48,12 @@ public boolean delete(final Path path, final boolean recursive) return true; } + @Override + public boolean rmdir(final Path path, final int capacity) + throws IOException { + return delete(path, true); + } + @Override public boolean mkdirs(final Path path) throws IOException { return true; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestJobThroughManifestCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestJobThroughManifestCommitter.java index 4bc2ce9bcf648..0ee9c2358a1ed 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestJobThroughManifestCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestJobThroughManifestCommitter.java @@ -598,7 +598,11 @@ public void test_0450_validationDetectsFailures() throws Throwable { public void test_0900_cleanupJob() throws Throwable { describe("Cleanup job"); CleanupJobStage.Arguments arguments = new CleanupJobStage.Arguments( - OP_STAGE_JOB_CLEANUP, true, true, false); + OP_STAGE_JOB_CLEANUP, + true, + true, + false + ); // the first run will list the three task attempt dirs and delete each // one before the toplevel dir. CleanupJobStage.Result result = new CleanupJobStage( diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestLoadManifestsStage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestLoadManifestsStage.java index 4dd7fe2dbcea5..99a64d94787aa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestLoadManifestsStage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestLoadManifestsStage.java @@ -176,7 +176,11 @@ public void testSaveThenLoadManyManifests() throws Throwable { // and skipping the rename stage (which is going to fail), // go straight to cleanup new CleanupJobStage(stageConfig).apply( - new CleanupJobStage.Arguments("", true, true, false)); + new CleanupJobStage.Arguments("", + true, + true, + false + )); heapinfo(heapInfo, "cleanup"); ManifestSuccessData success = createManifestOutcome(stageConfig, OP_STAGE_JOB_COMMIT); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/UnreliableManifestStoreOperations.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/UnreliableManifestStoreOperations.java index 811fc704a2a33..a0ac07daefc06 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/UnreliableManifestStoreOperations.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/UnreliableManifestStoreOperations.java @@ -294,6 +294,11 @@ public boolean delete(final Path path, final boolean recursive) return wrappedOperations.delete(path, recursive); } + @Override + public boolean rmdir(final Path path, final int capacity) throws IOException { + return delete(path, true); + } + @Override public boolean mkdirs(final Path path) throws IOException { maybeRaiseIOE("mkdirs", path, mkdirsToFail); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 8b6bc337fb21c..916dc74359fb8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -46,6 +46,7 @@ import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.impl.BackReference; +import org.apache.hadoop.fs.statistics.StoreStatisticNames; import org.apache.hadoop.security.ProviderUtils; import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; @@ -68,6 +69,7 @@ import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.IORateLimiter; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathIOException; @@ -103,8 +105,6 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.RateLimiting; -import org.apache.hadoop.util.RateLimitingFactory; import org.apache.hadoop.util.functional.RemoteIterators; import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.LambdaUtils; @@ -133,7 +133,7 @@ */ @InterfaceStability.Evolving public class AzureBlobFileSystem extends FileSystem - implements IOStatisticsSource { + implements IOStatisticsSource, IORateLimiter { public static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystem.class); private URI uri; private Path workingDir; @@ -155,9 +155,6 @@ public class AzureBlobFileSystem extends FileSystem /** Maximum Active blocks per OutputStream. */ private int blockOutputActiveBlocks; - /** Rate limiting for operations which use it to throttle their IO. */ - private RateLimiting rateLimiting; - /** Storing full path uri for better logging. */ private URI fullPathUri; @@ -187,6 +184,7 @@ public void initialize(URI uri, Configuration configuration) if (blockOutputActiveBlocks < 1) { blockOutputActiveBlocks = 1; } + LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri); // AzureBlobFileSystemStore with params in builder. AzureBlobFileSystemStore.AzureBlobFileSystemStoreBuilder @@ -235,8 +233,6 @@ public void initialize(URI uri, Configuration configuration) } } - rateLimiting = RateLimitingFactory.create(abfsConfiguration.getRateLimit()); - LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri); } @Override @@ -558,7 +554,8 @@ public Pair commitSingleFileByRename( } // acquire one IO permit - final Duration waitTime = rateLimiting.acquire(1); + final Duration waitTime = acquireIOCapacity(StoreStatisticNames.OP_RENAME, + qualifiedSrcPath, null, 1); try { final boolean recovered = abfsStore.rename(qualifiedSrcPath, @@ -1679,4 +1676,12 @@ public boolean hasPathCapability(final Path path, final String capability) public IOStatistics getIOStatistics() { return abfsCounters != null ? abfsCounters.getIOStatistics() : null; } + + @Override + public Duration acquireIOCapacity(final String operation, + final Path src, + final Path dest, final int requestedCapacity) { + return abfsStore.acquireIOCapacity(operation, src, null, requestedCapacity); + } + } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 8ece527e56a8d..7087eaf90a980 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -36,6 +36,7 @@ import java.nio.charset.CharsetEncoder; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; @@ -55,6 +56,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.IORateLimiter; import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; import org.apache.hadoop.fs.azurebfs.security.ContextProviderEncryptionAdapter; import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter; @@ -138,6 +140,8 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.BlockingThreadPoolExecutorService; +import org.apache.hadoop.util.RateLimiting; +import org.apache.hadoop.util.RateLimitingFactory; import org.apache.hadoop.util.SemaphoredDelegatingExecutor; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.http.client.utils.URIBuilder; @@ -160,13 +164,17 @@ import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_LIST_FILES; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_LIST_LOCATED_STATUS; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_LIST_STATUS; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_RENAME; /** * Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage. */ @InterfaceAudience.Public @InterfaceStability.Evolving -public class AzureBlobFileSystemStore implements Closeable, ListingSupport { +public class AzureBlobFileSystemStore implements Closeable, ListingSupport, IORateLimiter { private static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystemStore.class); private AbfsClient client; @@ -204,6 +212,9 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { /** ABFS instance reference to be held by the store to avoid GC close. */ private BackReference fsBackRef; + /** Rate limiting for operations which use it to throttle their IO. */ + private final RateLimiting rateLimiting; + /** * FileSystem Store for {@link AzureBlobFileSystem} for Abfs operations. * Built using the {@link AzureBlobFileSystemStoreBuilder} with parameters @@ -282,6 +293,7 @@ public AzureBlobFileSystemStore( abfsConfiguration.getMaxWriteRequestsToQueue(), 10L, TimeUnit.SECONDS, "abfs-bounded"); + this.rateLimiting = RateLimitingFactory.create(abfsConfiguration.getRateLimit()); } /** @@ -2211,4 +2223,33 @@ private void populateRenameRecoveryStatistics( abfsCounters.incrementCounter(METADATA_INCOMPLETE_RENAME_FAILURES, 1); } } + + /** + * Rate limiting. + *

+ * Some operations (list and rename) are considered more expensive and so whatever capacity + * is asked for is multiplied. + * {@inheritDoc} + */ + @Override + public Duration acquireIOCapacity(final String operation, final Path src, final Path dest, final int requestedCapacity) { + + double multiplier; + int lowCost = 1; + int mediumCost = 10; + switch (operation) { + case OP_LIST_FILES: + case OP_LIST_STATUS: + case OP_LIST_LOCATED_STATUS: + case OP_RENAME: + multiplier = mediumCost; + break; + default: + multiplier = lowCost; + } + final int capacity = (int) (requestedCapacity * multiplier); + LOG.debug("Acquiring IO capacity {} for operation: {}; multiplier: {}; final capacity: {}", + requestedCapacity, operation, multiplier, capacity); + return rateLimiting.acquire(capacity); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AbfsManifestStoreOperations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AbfsManifestStoreOperations.java index 6bfab3a8515a9..a5a7819617af1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AbfsManifestStoreOperations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AbfsManifestStoreOperations.java @@ -92,7 +92,7 @@ public void bindToFileSystem(FileSystem filesystem, Path path) throws IOExceptio etagsPreserved = true; LOG.debug("Bonded to filesystem with resilient commits under path {}", path); } catch (UnsupportedOperationException e) { - LOG.debug("No resilient commit support under path {}", path); + LOG.warn("No resilient commit support under path {}", path); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AzureManifestCommitterFactory.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AzureManifestCommitterFactory.java index b760fa7a4ac53..01fd9ec50e42a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AzureManifestCommitterFactory.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AzureManifestCommitterFactory.java @@ -20,10 +20,14 @@ import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory; @@ -41,10 +45,12 @@ @InterfaceStability.Evolving public class AzureManifestCommitterFactory extends ManifestCommitterFactory { + private static final Logger LOG = LoggerFactory.getLogger(AzureManifestCommitterFactory.class); + /** * Classname, which can be declared in job configurations. */ - public static final String NAME = ManifestCommitterFactory.class.getName(); + public static final String NAME = AzureManifestCommitterFactory.class.getName(); @Override public ManifestCommitter createOutputCommitter(final Path outputPath, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index dd4d7edc6beda..485aa031d702a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -158,7 +158,7 @@ public final class FileSystemConfigurations { /** * IO rate limit. Value: {@value} */ - public static final int RATE_LIMIT_DEFAULT = 10_000; + public static final int RATE_LIMIT_DEFAULT = 1_000; private FileSystemConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestStoreOperations.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestStoreOperations.java index 922782da29c5f..3d9e2e783403e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestStoreOperations.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestStoreOperations.java @@ -19,6 +19,8 @@ package org.apache.hadoop.fs.azurebfs.commit; import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import org.assertj.core.api.Assertions; import org.junit.Test; @@ -28,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.IORateLimiter; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding; import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract; @@ -39,6 +42,9 @@ import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME; import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ABFS_IO_RATE_LIMIT; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.RATE_LIMIT_DEFAULT; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_LIST_FILES; import static org.junit.Assume.assumeTrue; /** @@ -172,4 +178,35 @@ public void testEtagConsistencyAcrossRename() throws Throwable { .isEqualTo(srcTag); } + /** + * Verify IORateLimiter passes through to the ABFS store by + * making multiple requests greater than the rate limit, and + * verifying that the second request is delayed. + * The rate asked for is a fraction of the configured limit; + * it relies on the abfs store to use a multiplier + */ + @Test + public void testCapacityLimiting() throws Throwable { + describe("Verifying Rate Limiting"); + final Configuration conf = getConfiguration(); + // get the capacity per second, either the default or any override. + final int size = conf.getInt(FS_AZURE_ABFS_IO_RATE_LIMIT, RATE_LIMIT_DEFAULT); + final int capacity = (int) (size/4.0); + final IORateLimiter limiter = createManifestStoreOperations(); + + // this operation is amplified; if a different name is used then + // the second assertion fails. + final String operation = OP_LIST_FILES; + // first one has no delay + Assertions.assertThat(limiter.acquireIOCapacity(operation, new Path("/"), null, capacity)) + .describedAs("Duration of acquiring %d capacity", capacity) + .isEqualTo(Duration.ZERO); + + // second one is delayed + final Duration duration = limiter.acquireIOCapacity(operation, new Path("/"), null, capacity); + describe("Duration of second capacity request of %d: %s", capacity, duration); + Assertions.assertThat(duration) + .describedAs("Duration of acquiring %d capacity", capacity) + .isGreaterThan(Duration.of(1, ChronoUnit.SECONDS)); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestRenameRecovery.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestRenameRecovery.java new file mode 100644 index 0000000000000..7806afbef89f5 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestRenameRecovery.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.commit; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations; +import org.apache.hadoop.util.functional.CloseableTaskPoolSubmitter; + +import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_AVAILABLE; +import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME; +import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assumeScaleTestsEnabled; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ABFS_IO_RATE_LIMIT; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConfig.createCloseableTaskSubmitter; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_DELETE_DIR_CAPACITY; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.getEtag; +import static org.apache.hadoop.util.functional.FutureIO.awaitAllFutures; +import static org.assertj.core.api.Assumptions.assumeThat; + +/** + * HADOOP-19093: Scale test to attempt to generate rename failures. + *

+ * This test is intended to overload the storage account. + * It has been unable to create this problem, even on a very throttled + * account. + * The test suite asserts that the number of failures is zero; + * if it ever fails it means that the rename operation did fail + * and that recovery was successful. + */ +public class ITestRenameRecovery extends AbstractAbfsIntegrationTest { + + private static final Logger LOG = LoggerFactory.getLogger( + AbfsManifestStoreOperations.class); + + /** + * Time to sleep between checks for tasks to complete. + */ + public static final Duration SLEEP_INTERVAL = Duration.ofMillis(1000); + + /** + * Number of threads to use. + */ + private static final int THREAD_COUNT = 100; + + /** + * Number of renames to attempt per thread: {@value}. + */ + public static final int RENAMES = 100; + + /** + * Thread number; used for paths and messages. + */ + private final AtomicInteger threadNumber = new AtomicInteger(0); + + /** + * Flag to indicate that the test should exit: threads must check this. + */ + private final AtomicBoolean shouldExit = new AtomicBoolean(false); + + /** + * Any failure. + */ + private final AtomicReference failure = new AtomicReference<>(); + + /** + * How many renames were recovered from. + */ + private final AtomicLong renameRecoveries = new AtomicLong(0); + + /** + * Store operations through which renames are applied. + */ + private AbfsManifestStoreOperations storeOperations; + + /** + * Task pool submitter. + */ + private CloseableTaskPoolSubmitter submitter; + + /** + * Base directory for the tests. + */ + private Path baseDir; + + /** + * Time taken to rate limit. + * Uses a long over duration for atomic increments. + */ + private final AtomicLong rateLimitingTime = new AtomicLong(0); + + public ITestRenameRecovery() throws Exception { + } + + @Override + public void setup() throws Exception { + final Configuration conf = getRawConfiguration(); + assumeScaleTestsEnabled(conf); + conf.setInt(FS_AZURE_ABFS_IO_RATE_LIMIT, 50_000); + conf.setInt(OPT_DELETE_DIR_CAPACITY, 0); + super.setup(); + + final AzureBlobFileSystem fs = getFileSystem(); + baseDir = new Path("/" + getMethodName()); + fs.mkdirs(baseDir); + assumeThat(fs.hasPathCapability(baseDir, ETAGS_AVAILABLE)) + .describedAs("Etags must be available in the store") + .isTrue(); + assumeThat(fs.hasPathCapability(baseDir, ETAGS_PRESERVED_IN_RENAME)) + .describedAs("Etags must be preserved across renames") + .isTrue(); + storeOperations = new AbfsManifestStoreOperations(); + storeOperations.bindToFileSystem(fs, baseDir); + submitter = createCloseableTaskSubmitter(THREAD_COUNT, "ITestRenameRecovery"); + } + + @Override + public void teardown() throws Exception { + submitter.close(); + try { + getFileSystem().delete(baseDir, true); + } catch (IOException e) { + LOG.warn("Failed to delete {}", baseDir, e); + } + super.teardown(); + } + + @Test + public void testResilientRename() throws Throwable { + + List> futures = new ArrayList<>(THREAD_COUNT); + final AzureBlobFileSystem fs = getFileSystem(); + + // for every worker, create a file and submit a rename worker + for (int i = 0; i < THREAD_COUNT; i++) { + + final int id = threadNumber.incrementAndGet(); + final Path source = new Path(baseDir, "source-" + id); + final Path dest = new Path(baseDir, "dest-" + id); + // create the file in the junit test rather than in parallel, + // as parallel creation caused OOM on buffer allocation. + fs.create(source, true).close(); + + // submit the work + futures.add(submitter.submit(() -> + renameWorker(id, fs, source, dest))); + } + + // now wait for the futures + awaitAllFutures(futures, SLEEP_INTERVAL); + LOG.info("Rate limiting time: {}", Duration.ofMillis(rateLimitingTime.get())); + + // throw any failure which occurred. + if (failure.get() != null) { + throw failure.get(); + } + final String stats = ioStatisticsToPrettyString(fs.getIOStatistics()); + LOG.info("Store IO Statistics: {}", stats); + + Assertions.assertThat(renameRecoveries.get()) + .describedAs("Rename recovery took place; statistics %s", stats) + .isEqualTo(0); + } + + /** + * Worker to repeatedly rename a file. + * @param id worker ID + * @param fs filesystem + * @param source source path + * @param dest destination path + */ + private void renameWorker( + final int id, + AzureBlobFileSystem fs, final Path source, + final Path dest) { + int recoveries = 0; + Duration totalWaitTime = Duration.ZERO; + try { + LOG.info("Starting thread {}", id); + + int limit = RENAMES; + final FileStatus st = fs.getFileStatus(source); + + final String etag = getEtag(st); + Assertions.assertThat(etag) + .describedAs("Etag of %s", st) + .isNotNull() + .isNotEmpty(); + FileEntry forwards = new FileEntry(source, dest, 0, etag); + FileEntry backwards = new FileEntry(dest, source, 0, etag); + int count = 0; + while (!shouldExit.get() && count < limit) { + FileEntry entry = count % 2 == 0 ? forwards : backwards; + count++; + final ManifestStoreOperations.CommitFileResult result = + storeOperations.commitFile(entry); + final boolean recovered = result.recovered(); + totalWaitTime = totalWaitTime.plus(result.getWaitTime()); + if (recovered) { + recoveries++; + noteRecovery(entry); + } + final Path target = entry.getDestPath(); + final FileStatus de = fs.getFileStatus(target); + Assertions.assertThat(getEtag(de)) + .describedAs("Etag of %s with recovery=%s", de, recovered) + .isNotNull() + .isEqualTo(etag); + } + // clean up the files. + storeOperations.deleteFile(source); + storeOperations.deleteFile(dest); + + } catch (Throwable e) { + noteFailure(e); + } finally { + LOG.info("Thread {} exiting with recovery count of {} and total wait time of {}", + id, recoveries, totalWaitTime); + rateLimitingTime.addAndGet(totalWaitTime.toMillis()); + } + } + + /** + * Note that a recovery has been made. + * @param entry entry which was recovered. + */ + private void noteRecovery(final FileEntry entry) { + LOG.info("Recovered from rename(failure) {}", entry); + renameRecoveries.incrementAndGet(); + // we know recovery worked, so we can stop the test. + shouldExit.set(true); + } + + /** + * Note a rename failure. + * @param e exception. + */ + private void noteFailure(final Throwable e) { + LOG.error("Rename failed", e); + failure.compareAndSet(null, e); + shouldExit.set(true); + } + +}