diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java index d8ab16f41d3ac..6808bdc59530a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java @@ -31,6 +31,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.net.NetUtils.getHostname; + /** An implementation of a round-robin scheme for disk allocation for creating * files. The way it works is that it is kept track what disk was last * allocated for a file write. For the current request, the next disk from @@ -65,7 +67,10 @@ @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) @InterfaceStability.Unstable public class LocalDirAllocator { - + + static final String E_NO_SPACE_AVAILABLE = + "No space available in any of the local directories"; + //A Map from the config item names like "mapred.local.dir" //to the instance of the AllocatorPerContext. This //is a static object to make sure there exists exactly one instance per JVM @@ -384,6 +389,24 @@ int getCurrentDirectoryIndex() { return currentContext.get().dirNumLastAccessed.get(); } + /** + * Format a string, log at debug and append it to the history as a new line. + * + * @param history history to fill in + * @param fmt format string + * @param args varags + */ + private void note(StringBuilder history, String fmt, Object... args) { + try { + final String s = String.format(fmt, args); + history.append(s).append("\n"); + LOG.debug(s); + } catch (Exception e) { + // some resilience in case the format string is wrong + LOG.debug(fmt, e); + } + } + /** Get a path from the local FS. If size is known, we go * round-robin over the set of disks (via the configured dirs) and return * the first complete path which has enough space. @@ -393,6 +416,12 @@ int getCurrentDirectoryIndex() { */ public Path getLocalPathForWrite(String pathStr, long size, Configuration conf, boolean checkWrite) throws IOException { + + // history is built up and logged at error if the alloc + StringBuilder history = new StringBuilder(); + + note(history, "Searching for a directory for file \"%s\", size = %,d; checkWrite=%s", + pathStr, size, checkWrite); Context ctx = confChanged(conf); int numDirs = ctx.localDirs.length; int numDirsSearched = 0; @@ -406,27 +435,62 @@ public Path getLocalPathForWrite(String pathStr, long size, pathStr = pathStr.substring(1); } Path returnPath = null; - - if(size == SIZE_UNKNOWN) { //do roulette selection: pick dir with probability - //proportional to available size - long[] availableOnDisk = new long[ctx.dirDF.length]; - long totalAvailable = 0; - - //build the "roulette wheel" - for(int i =0; i < ctx.dirDF.length; ++i) { - final DF target = ctx.dirDF[i]; - // attempt to recreate the dir so that getAvailable() is valid - // if it fails, getAvailable() will return 0, so the dir will - // be declared unavailable. - // return value is logged at debug to keep spotbugs quiet. - final boolean b = new File(target.getDirPath()).mkdirs(); - LOG.debug("mkdirs of {}={}", target, b); - availableOnDisk[i] = target.getAvailable(); + + final int dirCount = ctx.dirDF.length; + long[] availableOnDisk = new long[dirCount]; + long totalAvailable = 0; + + StringBuilder pathNames = new StringBuilder(); + + //build the "roulette wheel" + for (int i =0; i < dirCount; ++i) { + final DF target = ctx.dirDF[i]; + // attempt to recreate the dir so that getAvailable() is valid + // if it fails, getAvailable() will return 0, so the dir will + // be declared unavailable. + // return value is logged at debug to keep spotbugs quiet. + final String name = target.getDirPath(); + pathNames.append(" ").append(name); + final File dirPath = new File(name); + + // existence probe with directory recreation + if (!dirPath.exists()) { + LOG.debug("Creating buffer dir {}", name); + if (dirPath.mkdirs()) { + note(history, "Created buffer dir %s", name); + } else { + note(history, "Failed to create buffer dir %s", name); + } + } + + // path already existed or the mkdir call had an outcome + // make sure the path is present and a dir, and if so add its availability + if (dirPath.isDirectory()) { + final long available = target.getAvailable(); + availableOnDisk[i] = available; + note(history, "%,d bytes available under path %s", available, name); totalAvailable += availableOnDisk[i]; + } else { + note(history, "%s does not exist/is not a directory", name); } + } - if (totalAvailable == 0){ - throw new DiskErrorException("No space available in any of the local directories."); + note(history, "Directory count is %d; total available capacity is %,d", + dirCount, totalAvailable); + + if (size == SIZE_UNKNOWN) { + //do roulette selection: pick dir with probability + // proportional to available size + note(history, "Size not specified, so picking directories at random."); + + if (totalAvailable == 0) { + // log error and history + String newErrorText = E_NO_SPACE_AVAILABLE + pathNames + + " on host" + getHostname(); + LOG.error(newErrorText); + LOG.error(history.toString()); + // then raise the exception + throw new DiskErrorException(newErrorText); } // Keep rolling the wheel till we get a valid path @@ -439,14 +503,20 @@ public Path getLocalPathForWrite(String pathStr, long size, dir++; } ctx.dirNumLastAccessed.set(dir); - returnPath = createPath(ctx.localDirs[dir], pathStr, checkWrite); + final Path localDir = ctx.localDirs[dir]; + returnPath = createPath(localDir, pathStr, checkWrite); if (returnPath == null) { totalAvailable -= availableOnDisk[dir]; availableOnDisk[dir] = 0; // skip this disk numDirsSearched++; + note(history, "No capacity in %s", localDir); + } else { + note(history, "Allocated file %s in %s", returnPath, localDir); } } } else { + note(history, "Requested file size is %,d; searching for a suitable directory", + size); // Start linear search with random increment if possible int randomInc = 1; if (numDirs > 2) { @@ -459,17 +529,22 @@ public Path getLocalPathForWrite(String pathStr, long size, maxCapacity = capacity; } if (capacity > size) { + final Path localDir = ctx.localDirs[dirNum]; try { - returnPath = createPath(ctx.localDirs[dirNum], pathStr, - checkWrite); + returnPath = createPath(localDir, pathStr, checkWrite); } catch (IOException e) { errorText = e.getMessage(); diskException = e; - LOG.debug("DiskException caught for dir {}", ctx.localDirs[dirNum], e); + note(history, "Exception while creating path %s: %s", localDir, errorText); + LOG.debug("DiskException caught for dir {}", localDir, e); } if (returnPath != null) { + // success ctx.getAndIncrDirNumLastAccessed(numDirsSearched); + note(history, "Allocated file %s in %s", returnPath, localDir); break; + } else { + note(history, "No capacity in %s", localDir); } } dirNum++; @@ -482,12 +557,18 @@ public Path getLocalPathForWrite(String pathStr, long size, } //no path found - String newErrorText = "Could not find any valid local directory for " + - pathStr + " with requested size " + size + - " as the max capacity in any directory is " + maxCapacity; + String hostname = getHostname(); + String newErrorText = "Could not find any valid local directory for " + + pathStr + " with requested size " + size + + " on host " + hostname + + " as the max capacity in any directory" + + " (" + pathNames + " )" + + " is " + maxCapacity; if (errorText != null) { newErrorText = newErrorText + " due to " + errorText; } + LOG.error(newErrorText); + LOG.error(history.toString()); throw new DiskErrorException(newErrorText, diskException); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java index 3693b4f0acde3..841b09be3f0a8 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java @@ -35,6 +35,8 @@ import org.junit.runners.Parameterized.Parameters; import org.junit.Test; +import static org.apache.hadoop.fs.LocalDirAllocator.E_NO_SPACE_AVAILABLE; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows; import static org.junit.Assert.*; @@ -524,13 +526,8 @@ public void testRemoveContext() throws IOException { @Test(timeout = 30000) public void testGetLocalPathForWriteForInvalidPaths() throws Exception { conf.set(CONTEXT, " "); - try { - dirAllocator.getLocalPathForWrite("/test", conf); - fail("not throwing the exception"); - } catch (IOException e) { - assertEquals("Incorrect exception message", - "No space available in any of the local directories.", e.getMessage()); - } + intercept(IOException.class, E_NO_SPACE_AVAILABLE, () -> + dirAllocator.getLocalPathForWrite("/test", conf)); } /** @@ -567,5 +564,27 @@ public void testDirectoryRecovery() throws Throwable { // and expect to get a new file back dirAllocator.getLocalPathForWrite("file2", -1, conf); } + + /** + * Test for HADOOP-19554. LocalDirAllocator still doesn't always recover + * from directory tree deletion. + */ + @Test(timeout = 30000) + public void testDirectoryRecoveryKnownSize() throws Throwable { + String dir0 = buildBufferDir(ROOT, 0); + String subdir = dir0 + "/subdir1/subdir2"; + + conf.set(CONTEXT, subdir); + // get local path and an ancestor + final Path pathForWrite = dirAllocator.getLocalPathForWrite("file", 512, conf); + final Path ancestor = pathForWrite.getParent().getParent(); + + // delete that ancestor + localFs.delete(ancestor, true); + // and expect to get a new file back + dirAllocator.getLocalPathForWrite("file2", -1, conf); + } + + }