Skip to content

Commit

Permalink
HADOOP-19093. differentiating deleteFile and rmdir better.
Browse files Browse the repository at this point in the history
Given up trying to make the load test work, instead created it
to a scale test. If it ever does now trigger failure,
the output should be insightful.

Change-Id: I15e78c7006a99661d62113c253679a27cbf914f8
  • Loading branch information
steveloughran committed Mar 26, 2024
1 parent 042cd73 commit 70fe8c4
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -57,6 +60,9 @@
@InterfaceStability.Unstable
public final class FutureIO {

private static final Logger LOG =
LoggerFactory.getLogger(FutureIO.class);

private FutureIO() {
}

Expand Down Expand Up @@ -279,12 +285,42 @@ public static <T> CompletableFuture<T> eval(
}

/**
* Wait for all the futures to complete; there's a small sleep between
* each iteration; enough to yield the CPU.
* Wait for all the futures to complete; the caller should pass down
* small sleep interval between each iteration; enough to yield the CPU.
* @param futures futures.
* @param sleepInterval Interval to await completion.
* @param sleepInterval Interval in milliseconds to await completion.
*/
public static void awaitAllFutures(Collection<Future<?>> futures, Duration sleepInterval) {
TaskPool.waitFor(futures, (int)sleepInterval.toMillis());
public static void awaitAllFutures(final Collection<Future<?>> futures,
final Duration sleepInterval) {
int size = futures.size();
LOG.debug("Waiting for {} tasks to complete", size);
if (size == 0) {
// shortcut for empty list.
return;
}
int oldNumFinished = 0;
while (true) {
int numFinished = (int) futures.stream()
.filter(Future::isDone)
.count();

if (oldNumFinished != numFinished) {
LOG.debug("Finished count -> {}/{}", numFinished, size);
oldNumFinished = numFinished;
}

if (numFinished == size) {
// all of the futures are done, stop looping
break;
} else {
try {
Thread.sleep(sleepInterval.toMillis());
} catch (InterruptedException e) {
futures.forEach(future -> future.cancel(true));
Thread.currentThread().interrupt();
break;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.util.functional;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -40,6 +41,7 @@
import org.apache.hadoop.fs.statistics.IOStatisticsContext;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.util.functional.FutureIO.awaitAllFutures;
import static org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFromIterable;

/**
Expand Down Expand Up @@ -466,7 +468,7 @@ private <E extends Exception> boolean runParallel(final Task<I, E> task)
taskFailed.set(true);
}
// let the above tasks complete (or abort)
waitFor(futures, sleepInterval);
awaitAllFutures(futures, Duration.ofMillis(sleepInterval));
int futureCount = futures.size();
futures.clear();

Expand Down Expand Up @@ -498,7 +500,7 @@ private <E extends Exception> boolean runParallel(final Task<I, E> task)
}

// let the revert tasks complete
waitFor(futures, sleepInterval);
awaitAllFutures(futures, Duration.ofMillis(sleepInterval));
}

// give priority to execution exceptions over
Expand Down Expand Up @@ -539,40 +541,6 @@ private void resetStatisticsContext() {
}
}

/**
* Wait for all the futures to complete; there's a small sleep between
* each iteration; enough to yield the CPU.
* @param futures futures.
* @param sleepInterval Interval in milliseconds to await completion.
*/
@InterfaceAudience.Private
public static void waitFor(Collection<Future<?>> futures, int sleepInterval) {
int size = futures.size();
LOG.debug("Waiting for {} tasks to complete", size);
int oldNumFinished = 0;
while (true) {
int numFinished = (int) futures.stream().filter(Future::isDone).count();

if (oldNumFinished != numFinished) {
LOG.debug("Finished count -> {}/{}", numFinished, size);
oldNumFinished = numFinished;
}

if (numFinished == size) {
// all of the futures are done, stop looping
break;
} else {
try {
Thread.sleep(sleepInterval);
} catch (InterruptedException e) {
futures.forEach(future -> future.cancel(true));
Thread.currentThread().interrupt();
break;
}
}
}
}

/**
* Create a task builder for the iterable.
* @param items item source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,29 @@ public abstract boolean delete(Path path, boolean recursive)
throws IOException;

/**
* Delete Directory to {@code FileSystem#delete(Path, true)}.
* Forward to {@code delete(Path, true)}
* unless overridden.
* <p>
* If it returns without an error: there is nothing at
* the end of the path.
* @param path path
* @return outcome
* @throws IOException failure.
*/
public boolean deleteFile(Path path)
throws IOException {
return delete(path, true);
}

/**
* Acquire the delete capacity then call {@code FileSystem#delete(Path, true)}
* or equivalent.
* <p>
* If it returns without an error: there is nothing at
* the end of the path.
* @param path path
* @param capacity IO capacity to ask for.
* @return true if the path was deleted.
* @return outcome
* @throws IOException failure.
*/
public abstract boolean rmdir(Path path, int capacity)
Expand Down Expand Up @@ -317,8 +334,7 @@ public IORateLimiter rateLimiterSource() {
*/
@Override
public Duration acquireIOCapacity(final String operation, final int requestedCapacity) {
final Duration duration = rateLimiterSource().acquireIOCapacity(operation, requestedCapacity);
return duration;
return rateLimiterSource().acquireIOCapacity(operation, requestedCapacity);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public boolean isFile(Path path) throws IOException {
return fileSystem.isFile(path);
}

/**
* Delete a path.
* The capacity to acquire is based on the recursive flag.
* {@inheritDoc}
*/
@Override
public boolean delete(Path path, boolean recursive)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,17 +454,20 @@ protected Boolean delete(

/**
* Delete a file at a path.
* <p>
* If it returns without an error: there is nothing at
* the end of the path.
* @param path path
* @param statistic statistic to update
* @return status or null
* @return outcome.
* @throws IOException IO Failure.
*/
protected Boolean deleteFile(
protected boolean deleteFile(
final Path path,
final String statistic)
throws IOException {
return trackDuration(getIOStatistics(), statistic, () ->
operations.delete(path, false));
operations.deleteFile(path));
}

/**
Expand Down Expand Up @@ -945,7 +948,7 @@ protected final TaskPool.Submitter getIOProcessors(int size) {
*/
protected boolean deleteDir(
final Path dir,
String statistic)
final String statistic)
throws IOException {
return trackDuration(getIOStatistics(), statistic, () ->
operations.rmdir(dir, stageConfig.getDeleteDirCapacity()));
Expand All @@ -959,10 +962,10 @@ protected boolean deleteDir(
*/
protected IOException deleteDirSuppressingExceptions(
final Path dir,
String statistic)
final String statistic)
throws IOException {
try {
deleteDir(dir, OP_DELETE_DIR);
deleteDir(dir, statistic);
return null;
} catch (IOException ex) {
LOG.info("Error deleting {}: {}", dir, ex.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public boolean delete(final Path path, final boolean recursive)
@Override
public boolean rmdir(final Path path, final int capacity)
throws IOException {
return true;
return delete(path, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2234,7 +2234,6 @@ private void populateRenameRecoveryStatistics(
@Override
public Duration acquireIOCapacity(final String operation, final int requestedCapacity) {


double multiplier;
int lowCost = 1;
int mediumCost = 10;
Expand All @@ -2248,7 +2247,7 @@ public Duration acquireIOCapacity(final String operation, final int requestedCap
default:
multiplier = lowCost;
}
final int capacity = (int)(requestedCapacity * multiplier);
final int capacity = (int) (requestedCapacity * multiplier);
LOG.debug("Acquiring IO capacity {} for operation: {}; multiplier: {}; final capacity: {}",
requestedCapacity, operation, multiplier, capacity);
return rateLimiting.acquire(capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ABFS_IO_RATE_LIMIT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.RATE_LIMIT_DEFAULT;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_LIST_FILES;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_RENAME;
import static org.junit.Assume.assumeTrue;

/**
Expand Down Expand Up @@ -190,9 +189,9 @@ public void testEtagConsistencyAcrossRename() throws Throwable {
public void testCapacityLimiting() throws Throwable {
describe("Verifying Rate Limiting");
final Configuration conf = getConfiguration();
// get the capacity per second
// get the capacity per second, either the default or any override.
final int size = conf.getInt(FS_AZURE_ABFS_IO_RATE_LIMIT, RATE_LIMIT_DEFAULT);
final int capacity = (int)(size/4.0);
final int capacity = (int) (size/4.0);
final IORateLimiter limiter = createManifestStoreOperations();

// this operation is amplified; if a different name is used then
Expand Down
Loading

0 comments on commit 70fe8c4

Please sign in to comment.