Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19093. [ABFS] Improve rate limiting through ABFS in Manifest Committer #6596

Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs;

import java.time.Duration;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
* An optional interface for classes that provide rate limiters.
* For a filesystem source, the operation name SHOULD be one of
* those listed in
* {@link org.apache.hadoop.fs.statistics.StoreStatisticNames}
* if the operation is listed there.
* <p>
* This interfaces is intended to be exported by FileSystems so that
* applications wishing to perform bulk operations may request access
* to a rate limiter <i>which is shared across all threads interacting
* with the store.</i>.
* That is: the rate limiting is global to the specific instance of the
* object implementing this interface.
* <p>
* It is not expected to be shared with other instances of the same
* class, or across processes.
* <p>
* This means it is primarily of benefit when limiting bulk operations
* which can overload an (object) store from a small pool of threads.
* Examples of this can include:
* <ul>
* <li>Bulk delete operations</li>
* <li>Bulk rename operations</li>
* <li>Completing many in-progress uploads</li>
* <li>Deep and wide recursive treewalks</li>
* <li>Reading/prefetching many blocks within a file</li>
* </ul>
* In cluster applications, it is more likely that rate limiting is
* useful during job commit operations, especially in processes such
* a Spark Drivers, which may be committing work from multiple jobs over
* a short period of time.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface IORateLimiter {

/**
* Acquire IO capacity.
* <p>
* The implementation may assign different costs to the different
* operations.
* <p>
* If there is not enough space, the permits will be acquired,
* but the subsequent call will block until the capacity has been
* refilled.
* <p>
* The path parameter is used to support stores where there may be different throttling
* under different paths.
* @param operation operation being performed. Must not be null, may be "",
* should be from {@link org.apache.hadoop.fs.statistics.StoreStatisticNames}
* where there is a matching operation.
* @param src path under which the operations will be initiated.
* @param dest destination path for rename operations
* @param requestedCapacity capacity to acquire.
* Must be greater than or equal to 0.
* @return time spent waiting for output.
*/
Duration acquireIOCapacity(String operation, Path src, Path dest, int requestedCapacity);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.impl;

import org.apache.hadoop.fs.IORateLimiter;
import org.apache.hadoop.util.RateLimiting;
import org.apache.hadoop.util.RateLimitingFactory;

import static java.util.Objects.requireNonNull;

/**
* Implementation support for the IO rate limiter.
*/
public final class IORateLimiterSupport {

private IORateLimiterSupport() {
}

/**
* Get a rate limiter source which has no rate limiting.
* @return a rate limiter source which has no rate limiting.
*/
public static IORateLimiter unlimited() {
return (operation, src, dest, requestedCapacity) -> {
requireNonNull(operation, "operation");
return RateLimitingFactory.unlimitedRate().acquire(requestedCapacity);
};
}

/**
* Create a rate limiter with a fixed capacity.
* @param capacityPerSecond capacity per second.
* @return a rate limiter.
*/
public static IORateLimiter create(int capacityPerSecond) {
final RateLimiting limiting = RateLimitingFactory.create(capacityPerSecond);
return (operation, src, dest, requestedCapacity) -> {
requireNonNull(operation, "operation");
return limiting.acquire(requestedCapacity);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public final class StoreStatisticNames {
/** {@value}. */
public static final String OP_DELETE = "op_delete";

/** {@value}. */
public static final String OP_DELETE_DIR = "op_delete_dir";

/** {@value}. */
public static final String OP_EXISTS = "op_exists";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
* Can be used to throttle use of object stores where excess load
* will trigger cluster-wide throttling, backoff etc. and so collapse
* performance.
* <p>
* The time waited is returned as a Duration type.
* The google rate limiter implements this by allowing a caller to ask for
* <p>
* The google rate limiter implements rate limiting by allowing a caller to ask for
* more capacity than is available. This will be granted
* but the subsequent request will be blocked if the bucket of
* capacity hasn't let refilled to the point where there is
Expand All @@ -44,8 +46,11 @@ public interface RateLimiting {
* If there is not enough space, the permits will be acquired,
* but the subsequent call will block until the capacity has been
* refilled.
* <p>
* If the capacity is zero, no delay will take place.
* @param requestedCapacity capacity to acquire.
* @return time spent waiting for output.
* Must be greater than or equal to 0.
* @return time spent waiting to acquire the capacity..
*/
Duration acquire(int requestedCapacity);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter;

import static org.apache.hadoop.util.Preconditions.checkArgument;

/**
* Factory for Rate Limiting.
* This should be only place in the code where the guava RateLimiter is imported.
Expand All @@ -50,6 +52,7 @@ private static class NoRateLimiting implements RateLimiting {

@Override
public Duration acquire(int requestedCapacity) {
checkArgument(requestedCapacity >= 0, "requestedCapacity must be >= 0");
return INSTANTLY;
}
}
Expand All @@ -70,6 +73,11 @@ private RestrictedRateLimiting(int capacityPerSecond) {

@Override
public Duration acquire(int requestedCapacity) {
checkArgument(requestedCapacity >= 0, "requestedCapacity must be >= 0");
if (requestedCapacity == 0) {
// google limiter does not do this.
return INSTANTLY;
}
final double delayMillis = limiter.acquire(requestedCapacity);
return delayMillis == 0
? INSTANTLY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -29,6 +31,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -55,6 +60,9 @@
@InterfaceStability.Unstable
public final class FutureIO {

private static final Logger LOG =
LoggerFactory.getLogger(FutureIO.class);

private FutureIO() {
}

Expand Down Expand Up @@ -275,4 +283,44 @@ public static <T> CompletableFuture<T> eval(
}
return result;
}

/**
* Wait for all the futures to complete; the caller should pass down
* small sleep interval between each iteration; enough to yield the CPU.
* @param futures futures.
* @param sleepInterval Interval in milliseconds to await completion.
*/
public static void awaitAllFutures(final Collection<Future<?>> futures,
final Duration sleepInterval) {
int size = futures.size();
LOG.debug("Waiting for {} tasks to complete", size);
if (size == 0) {
// shortcut for empty list.
return;
}
int oldNumFinished = 0;
while (true) {
int numFinished = (int) futures.stream()
.filter(Future::isDone)
.count();

if (oldNumFinished != numFinished) {
LOG.debug("Finished count -> {}/{}", numFinished, size);
oldNumFinished = numFinished;
}

if (numFinished == size) {
// all of the futures are done, stop looping
break;
} else {
try {
Thread.sleep(sleepInterval.toMillis());
} catch (InterruptedException e) {
futures.forEach(future -> future.cancel(true));
Thread.currentThread().interrupt();
break;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.util.functional;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -40,6 +41,7 @@
import org.apache.hadoop.fs.statistics.IOStatisticsContext;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.util.functional.FutureIO.awaitAllFutures;
import static org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFromIterable;

/**
Expand Down Expand Up @@ -466,7 +468,7 @@ private <E extends Exception> boolean runParallel(final Task<I, E> task)
taskFailed.set(true);
}
// let the above tasks complete (or abort)
waitFor(futures, sleepInterval);
awaitAllFutures(futures, Duration.ofMillis(sleepInterval));
int futureCount = futures.size();
futures.clear();

Expand Down Expand Up @@ -498,7 +500,7 @@ private <E extends Exception> boolean runParallel(final Task<I, E> task)
}

// let the revert tasks complete
waitFor(futures, sleepInterval);
awaitAllFutures(futures, Duration.ofMillis(sleepInterval));
}

// give priority to execution exceptions over
Expand Down Expand Up @@ -539,39 +541,6 @@ private void resetStatisticsContext() {
}
}

/**
* Wait for all the futures to complete; there's a small sleep between
* each iteration; enough to yield the CPU.
* @param futures futures.
* @param sleepInterval Interval in milliseconds to await completion.
*/
private static void waitFor(Collection<Future<?>> futures, int sleepInterval) {
int size = futures.size();
LOG.debug("Waiting for {} tasks to complete", size);
int oldNumFinished = 0;
while (true) {
int numFinished = (int) futures.stream().filter(Future::isDone).count();

if (oldNumFinished != numFinished) {
LOG.debug("Finished count -> {}/{}", numFinished, size);
oldNumFinished = numFinished;
}

if (numFinished == size) {
// all of the futures are done, stop looping
break;
} else {
try {
Thread.sleep(sleepInterval);
} catch (InterruptedException e) {
futures.forEach(future -> future.cancel(true));
Thread.currentThread().interrupt();
break;
}
}
}
}

/**
* Create a task builder for the iterable.
* @param items item source.
Expand Down
Loading
Loading