diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/NNBench.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/NNBench.java index 5cf8edf833911..2465f606ee66f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/NNBench.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/NNBench.java @@ -26,9 +26,15 @@ import java.io.InputStreamReader; import java.io.PrintStream; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Date; import java.util.Iterator; +import java.util.List; import java.util.StringTokenizer; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; @@ -55,6 +61,7 @@ import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,27 +141,61 @@ private void cleanupBeforeTestrun() throws IOException { * * @throws IOException on error */ - private void createControlFiles() throws IOException { + private void createControlFiles() { LOG.info("Creating " + numberOfMaps + " control files"); + ExecutorService executorService = + HadoopExecutors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors()); + List> list = new ArrayList<>(); for (int i = 0; i < numberOfMaps; i++) { String strFileName = "NNBench_Controlfile_" + i; Path filePath = new Path(new Path(baseDir, CONTROL_DIR_NAME), strFileName); + Future future = executorService.submit(new CreateControlFile(strFileName, filePath, i)); + list.add(future); + } + + for (int i = 0; i < list.size(); i++) { + try { + list.get(i).get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Creating control files Error.", e); + } + } + + executorService.shutdown(); + } + + private class CreateControlFile implements Callable { + private String strFileName; + private Path filePath; + private int order; + + CreateControlFile(String strFileName, Path filePath, int order) { + this.strFileName = strFileName; + this.filePath = filePath; + this.order = order; + } + + @Override + public Void call() throws Exception { SequenceFile.Writer writer = null; try { writer = SequenceFile.createWriter(getConf(), Writer.file(filePath), Writer.keyClass(Text.class), Writer.valueClass(LongWritable.class), Writer.compression(CompressionType.NONE)); - writer.append(new Text(strFileName), new LongWritable(i)); + writer.append(new Text(strFileName), new LongWritable(order)); } finally { if (writer != null) { writer.close(); } } + return null; } + } + /** * Display version */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/TestNNBench.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/TestNNBench.java index 5f669d6b57897..0273613f9fb2f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/TestNNBench.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/TestNNBench.java @@ -25,6 +25,7 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.HadoopTestCase; import org.apache.hadoop.mapred.JobConf; @@ -37,6 +38,8 @@ public class TestNNBench extends HadoopTestCase { private static final String BASE_DIR = new File(System.getProperty("test.build.data", "build/test/data"), "NNBench").getAbsolutePath(); + private static final String CONTROL_DIR_NAME = "control"; + public TestNNBench() throws IOException { super(LOCAL_MR, LOCAL_FS, 1, 1); @@ -74,6 +77,15 @@ public void testNNBenchCreateAndRename() throws Exception { getFileSystem().exists(renamedPath)); } + @Test(timeout = 30000) + public void testNNBenchCreateControlFilesWithPool() throws Exception { + runNNBench(createJobConf(), "create_write", BASE_DIR, "5"); + Path path = new Path(BASE_DIR, CONTROL_DIR_NAME); + + FileStatus[] fileStatuses = getFileSystem().listStatus(path); + assertEquals(5, fileStatuses.length); + } + @Test(timeout = 30000) public void testNNBenchCrossCluster() throws Exception { MiniDFSCluster dfsCluster = new MiniDFSCluster.Builder(new JobConf()) @@ -97,6 +109,14 @@ private void runNNBench(Configuration conf, String operation, String baseDir) assertEquals(0, ToolRunner.run(conf, new NNBench(), genArgs)); } + private void runNNBench(Configuration conf, String operation, String baseDir, String numMaps) + throws Exception { + String[] genArgs = {"-operation", operation, "-baseDir", baseDir, + "-startTime", "" + (Time.now() / 1000 + 3), "-blockSize", "1024", "-maps", numMaps}; + + assertEquals(0, ToolRunner.run(conf, new NNBench(), genArgs)); + } + private void runNNBench(Configuration conf, String operation) throws Exception { runNNBench(conf, operation, BASE_DIR);