diff --git a/app/com/linkedin/drelephant/spark/heuristics/ConfigurationUtils.scala b/app/com/linkedin/drelephant/spark/heuristics/ConfigurationUtils.scala index 37a4f2a9f..75ef7493f 100644 --- a/app/com/linkedin/drelephant/spark/heuristics/ConfigurationUtils.scala +++ b/app/com/linkedin/drelephant/spark/heuristics/ConfigurationUtils.scala @@ -54,8 +54,27 @@ object ConfigurationUtils { val TASK_SKEW_TASK_DURATION_MIN_THRESHOLD_KEY = "task_skew_task_duration_threshold" val MAX_RECOMMENDED_PARTITIONS_KEY = "max_recommended_partitions" - - // Severity hresholds for task duration in minutes, when checking to see if the median task + // keys for finding specific recommendations + val EXECUTION_MEMORY_SPILL_LARGE_DATA_RECOMMENDATION_KEY = "execution_memory_spill_large_data_recommendation" + val TASK_SKEW_INPUT_DATA_RECOMMENDATION_KEY = "task_skew_input_data_recommendation" + val TASK_SKEW_GENERIC_RECOMMENDATION_KEY = "task_skew_generic_recommendation" + val LONG_TASKS_LARGE_DATA_RECOMMENDATION_KEY = "long_tasks_large_data_recommendation" + val SLOW_TASKS_RECOMMENDATION_KEY = "slow_tasks_recommendation" + val LONG_TASKS_FEW_PARTITIONS_RECOMMENDATION_KEY = "long tasks_few_partitions" + val LONG_TASKS_FEW_INPUT_PARTITIONS_RECOMMENDATION_KEY = "long tasks_few_input_partitions" + + // default recommendations + val DEFAULT_EXECUTION_MEMORY_SPILL_LARGE_DATA_RECOMMENDATION = "a large amount of data is being processesd. " + + "Examine the application to see if this can be reduced" + val DEFAULT_TASK_SKEW_INPUT_DATA_RECOMMENDATION = "please try to modify the application to make the input partitions more even" + val DEFAULT_TASK_SKEW_GENERIC_RECOMMENDATION = "please try to modify the application to make the partitions more even" + val DEFAULT_LONG_TASKS_LARGE_DATA_RECOMMENDATION = "please try to reduce the amount of data being processed" + val DEFAULT_SLOW_TASKS_RECOMMENDATION = "please optimize the code to improve performance" + val DEFAULT_LONG_TASKS_FEW_PARTITIONS_RECOMMENDATION = "please increase the number of partitions" + val DEFAULT_LONG_TASKS_FEW_INPUT_PARTITIONS_RECOMMENDATION = "please increase the number of partitions for reading data" + + + // Severity thresholds for task duration in minutes, when checking to see if the median task // run time is too long for a stage. val DEFAULT_TASK_DURATION_THRESHOLDS = SeverityThresholds(low = 2.5D * MILLIS_PER_MIN, moderate = 5.0D * MILLIS_PER_MIN, @@ -82,6 +101,11 @@ object ConfigurationUtils { // The default threshold (3TB) for checking for maximum amount of data processed, for which to // alert for execution memory spill. Tasks processing more data would be expected to have some // amount of spill, due to the large amount of data processed. + // Estimating the size based on some reasonable values for configuration parameters (and how + // much data could be kept in unified memory given these values): + // spark.executor.memory / spark.executor.cores * spark.memory.fraction * + // (1 - spark.memory.storageFraction) * spark.sql.shuffle.partitions + // = 5GB / 2 * 0.6 * (1 - 0.5) * 4000 val DEFAULT_MAX_DATA_PROCESSED_THRESHOLD = "3TB" // The default threshold for the ratio of the time for longest running task for a stage to the diff --git a/app/com/linkedin/drelephant/spark/heuristics/StagesAnalyzer.scala b/app/com/linkedin/drelephant/spark/heuristics/StagesAnalyzer.scala index 9d3ce2701..e4d4a4a7d 100644 --- a/app/com/linkedin/drelephant/spark/heuristics/StagesAnalyzer.scala +++ b/app/com/linkedin/drelephant/spark/heuristics/StagesAnalyzer.scala @@ -102,6 +102,43 @@ private[heuristics] class StagesAnalyzer( private val maxRecommendedPartitions = heuristicConfigurationData.getParamMap .getOrDefault(MAX_RECOMMENDED_PARTITIONS_KEY, DEFAULT_MAX_RECOMMENDED_PARTITIONS).toInt + // recommendation to give if there is execution memory spill due to too much data being processed. + // Some amount of spill is expected in this, but alert the users so that they are aware that spill + // is happening. + private val executionMemorySpillRecommendation = heuristicConfigurationData.getParamMap + .getOrDefault(EXECUTION_MEMORY_SPILL_LARGE_DATA_RECOMMENDATION_KEY, + DEFAULT_EXECUTION_MEMORY_SPILL_LARGE_DATA_RECOMMENDATION) + + // recommendation to give if task skew is detected, and input data is read for the stage. + private val taskSkewInputDataRecommendation = heuristicConfigurationData.getParamMap + .getOrDefault(TASK_SKEW_INPUT_DATA_RECOMMENDATION_KEY, DEFAULT_TASK_SKEW_INPUT_DATA_RECOMMENDATION) + + // recommendation to give if task skew is detected, and there is no input data. + private val taskSkewGenericRecommendation = heuristicConfigurationData.getParamMap + .getOrDefault(TASK_SKEW_GENERIC_RECOMMENDATION_KEY, DEFAULT_TASK_SKEW_GENERIC_RECOMMENDATION) + + // recommendation to give if there are long running tasks, and there is a lot of data being + // processed, and many partitions already. In this case, long running tasks may be expected, but + // alert the user, in case it is possible to filter out some data. + private val longTasksLargeDataRecommenation = heuristicConfigurationData.getParamMap + .getOrDefault(LONG_TASKS_LARGE_DATA_RECOMMENDATION_KEY, DEFAULT_LONG_TASKS_LARGE_DATA_RECOMMENDATION) + + // recommendation to give if there are long running tasks, a reasonable number of partitions, + // and not too much data processed. In this case, the tasks are slow. + private val slowTasksRecommendation = heuristicConfigurationData.getParamMap + .getOrDefault(SLOW_TASKS_RECOMMENDATION_KEY, DEFAULT_SLOW_TASKS_RECOMMENDATION) + + // recommendation to give if there are long running tasks and relatively few partitions. + private val longTasksFewPartitionsRecommendation = heuristicConfigurationData.getParamMap + .getOrDefault(LONG_TASKS_FEW_PARTITIONS_RECOMMENDATION_KEY, DEFAULT_LONG_TASKS_FEW_PARTITIONS_RECOMMENDATION) + + // recommendation to give if there are long running tasks, input data is being read (and so + // controlling the number of tasks), and relatively few partitions. + private val longTasksFewInputPartitionsRecommendation = heuristicConfigurationData.getParamMap + .getOrDefault(LONG_TASKS_FEW_INPUT_PARTITIONS_RECOMMENDATION_KEY, + DEFAULT_LONG_TASKS_FEW_INPUT_PARTITIONS_RECOMMENDATION) + + /** @return list of analysis results for all the stages. */ def getStageAnalysis(): Seq[StageAnalysis] = { val appConfigurationProperties: Map[String, String] = data.appConfigurationProperties @@ -158,8 +195,7 @@ private[heuristics] class StagesAnalyzer( // don't flag execution memory spill if there is a lot of data being processed, // since some spill may be unavoidable in this case. if (hasSignificantSeverity(rawSpillSeverity)) { - details += s"Stage $stageId is processing a lot of data; examine the application to see " + - s"if this can be reduced." + details += s"Stage $stageId: ${executionMemorySpillRecommendation}." } Severity.NONE } @@ -246,7 +282,7 @@ private[heuristics] class StagesAnalyzer( var inputSkewSeverity = Severity.NONE if (hasSignificantSeverity(taskSkewSeverity)) { details += - s"Stage $stageId has skew in task run time (median is $medianStr, max is $maximumStr)" + s"Stage $stageId has skew in task run time (median is $medianStr, max is $maximumStr)." } stageData.taskSummary.foreach { summary => checkSkewedData(stageId, summary.memoryBytesSpilled(DISTRIBUTION_MEDIAN_IDX), @@ -256,8 +292,7 @@ private[heuristics] class StagesAnalyzer( input.bytesRead(DISTRIBUTION_MAX_IDX), "task input bytes", details) if (hasSignificantSeverity(inputSkewSeverity)) { // The stage is reading input data, try to adjust the amount of data to even the partitions - details += s"Stage $stageId: please set DaliSpark.SPLIT_SIZE to make " + - "partitions more even." + details += s"Stage $stageId: ${taskSkewInputDataRecommendation}." } } summary.outputMetrics.foreach { output => @@ -274,8 +309,7 @@ private[heuristics] class StagesAnalyzer( } } if (hasSignificantSeverity(rawSkewSeverity) && !hasSignificantSeverity(inputSkewSeverity)) { - details += s"Stage $stageId: please try to modify the application to make " + - "the partitions more even." + details += s"Stage $stageId: ${taskSkewGenericRecommendation}." } } val score = Utils.getHeuristicScore(taskSkewSeverity, stageData.numTasks) @@ -331,26 +365,26 @@ private[heuristics] class StagesAnalyzer( val runTime = Utils.getDuration(medianTime.map(_.toLong).getOrElse(0L)) val maxData = Seq(stageData.inputBytes, stageData.shuffleReadBytes, stageData.shuffleWriteBytes, stageData.outputBytes).max - details += s"Stage $stageId: median task run time is $runTime." + val inputBytes = MemoryFormatUtils.bytesToString(stageData.inputBytes) + val outputBytes = MemoryFormatUtils.bytesToString(stageData.outputBytes) + val shuffleReadBytes = MemoryFormatUtils.bytesToString(stageData.shuffleReadBytes) + val shuffleWriteBytes = MemoryFormatUtils.bytesToString(stageData.shuffleWriteBytes) + details += s"Stage $stageId has a long median task run time of $runTime." + details += s"Stage $stageId has ${stageData.numTasks} tasks, $inputBytes input," + + s" $shuffleReadBytes shuffle read, $shuffleWriteBytes shuffle write, and $outputBytes output." if (stageData.numTasks >= maxRecommendedPartitions) { if (maxData >= maxDataProcessedThreshold) { - val inputBytes = MemoryFormatUtils.bytesToString(stageData.inputBytes) - val shuffleReadBytes = MemoryFormatUtils.bytesToString(stageData.shuffleReadBytes) - val shuffleWriteBytes = MemoryFormatUtils.bytesToString(stageData.shuffleWriteBytes) - details += s"Stage $stageId has $inputBytes input, $shuffleReadBytes shuffle read, " + - "$shuffleWriteBytes shuffle write. Please try to reduce the amount of data being processed." + details += s"Stage $stageId: ${longTasksLargeDataRecommenation}." } else { - details += s"Stage $stageId: please optimize the code to improve performance." + details += s"Stage $stageId: ${slowTasksRecommendation}." } } else { if (stageData.inputBytes > 0) { // The stage is reading input data, try to increase the number of readers - details += s"Stage $stageId: please set DaliSpark.SPLIT_SIZE to a smaller " + - "value to increase the number of tasks reading input data for this stage." + details += s"Stage $stageId: ${longTasksFewInputPartitionsRecommendation}." } else if (stageData.numTasks != curNumPartitions) { - details += s"Stage $stageId: please increase the number of partitions, which " + - s"is currently set to ${stageData.numTasks}." + details += s"Stage $stageId: ${longTasksFewPartitionsRecommendation}." } } } diff --git a/test/com/linkedin/drelephant/spark/heuristics/StagesAnalyzerTest.scala b/test/com/linkedin/drelephant/spark/heuristics/StagesAnalyzerTest.scala index 80cfd3179..6d240ebef 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/StagesAnalyzerTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/StagesAnalyzerTest.scala @@ -83,42 +83,42 @@ class StagesAnalyzerTest extends FunSpec with Matchers { Seq()).create(), StageAnalysisBuilder(3, 5).taskRuntime(20, 250) .skew(Severity.SEVERE, Severity.SEVERE, 15, - Seq("Stage 3 has skew in task run time (median is 20.00 sec, max is 4.17 min)", + Seq("Stage 3 has skew in task run time (median is 20.00 sec, max is 4.17 min).", "Stage 3: please try to modify the application to make the partitions more even.")).create(), StageAnalysisBuilder(4, 5).taskRuntime(5, 250).input(260) .skew(Severity.CRITICAL, Severity.CRITICAL, 20, - Seq("Stage 4 has skew in task run time (median is 5.00 sec, max is 4.17 min)", + Seq("Stage 4 has skew in task run time (median is 5.00 sec, max is 4.17 min).", "Stage 4 has skew in task input bytes (median is 5 MB, max is 250 MB).", - "Stage 4: please set DaliSpark.SPLIT_SIZE to make partitions more even.")).create(), + "Stage 4: please try to modify the application to make the input partitions more even.")).create(), StageAnalysisBuilder(5, 5).taskRuntime(50, 250).shuffleRead(350).shuffleWrite(400) .skew(Severity.MODERATE, Severity.MODERATE, 10, - Seq("Stage 5 has skew in task run time (median is 50.00 sec, max is 4.17 min)", + Seq("Stage 5 has skew in task run time (median is 50.00 sec, max is 4.17 min).", "Stage 5 has skew in task shuffle read bytes (median is 50 MB, max is 250 MB).", "Stage 5 has skew in task shuffle write bytes (median is 50 MB, max is 250 MB).", "Stage 5: please try to modify the application to make the partitions more even.")).create(), StageAnalysisBuilder(6, 5).taskRuntime(50, 250).shuffleRead(50).output(50) .skew(Severity.MODERATE, Severity.MODERATE, 10, - Seq( "Stage 6 has skew in task run time (median is 50.00 sec, max is 4.17 min)", + Seq( "Stage 6 has skew in task run time (median is 50.00 sec, max is 4.17 min).", "Stage 6: please try to modify the application to make the partitions more even.")).create(), StageAnalysisBuilder(7, 5).taskRuntime(20, 250).shuffleWrite(600).output(290) .skew(Severity.SEVERE, Severity.SEVERE, 15, - Seq("Stage 7 has skew in task run time (median is 20.00 sec, max is 4.17 min)", + Seq("Stage 7 has skew in task run time (median is 20.00 sec, max is 4.17 min).", "Stage 7 has skew in task output bytes (median is 20 MB, max is 250 MB).", "Stage 7: please try to modify the application to make the partitions more even.")).create(), StageAnalysisBuilder(8, 3).taskRuntime(200, 250) .longTask(Severity.LOW, 0, Seq()).create(), StageAnalysisBuilder(9, 3).taskRuntime(5, 250) .skew(Severity.CRITICAL, Severity.CRITICAL, 12, - Seq("Stage 9 has skew in task run time (median is 5.00 sec, max is 4.17 min)", + Seq("Stage 9 has skew in task run time (median is 5.00 sec, max is 4.17 min).", "Stage 9: please try to modify the application to make the partitions more even.")).create(), StageAnalysisBuilder(10, 3).taskRuntime(20, 250).input(300) .skew(Severity.SEVERE, Severity.SEVERE, 9, - Seq("Stage 10 has skew in task run time (median is 20.00 sec, max is 4.17 min)", + Seq("Stage 10 has skew in task run time (median is 20.00 sec, max is 4.17 min).", "Stage 10 has skew in task input bytes (median is 20 MB, max is 250 MB).", - "Stage 10: please set DaliSpark.SPLIT_SIZE to make partitions more even.")).create(), + "Stage 10: please try to modify the application to make the input partitions more even.")).create(), StageAnalysisBuilder(11, 3).taskRuntime(50, 250).shuffleRead(350) .skew(Severity.MODERATE, Severity.MODERATE, 6, - Seq("Stage 11 has skew in task run time (median is 50.00 sec, max is 4.17 min)", + Seq("Stage 11 has skew in task run time (median is 50.00 sec, max is 4.17 min).", "Stage 11 has skew in task shuffle read bytes (median is 50 MB, max is 250 MB).", "Stage 11: please try to modify the application to make the partitions more even.")).create(), StageAnalysisBuilder(12, 5).taskRuntime(2, 50).duration(60) @@ -148,7 +148,9 @@ class StagesAnalyzerTest extends FunSpec with Matchers { StageBuilder(5, 3).taskRuntime(1200, 1500, 4000).create(), StageBuilder(6, 3).taskRuntime(700, 3500, 4500).create(), StageBuilder(7, 2).taskRuntime(700, 900, 2000).create(), - StageBuilder(8, 3).taskRuntime(3000, 3000, 9000).input(2 << 20, 3 << 20, 5 << 20).create()) + StageBuilder(8, 3).taskRuntime(3000, 3000, 9000).input(2 << 20, 3 << 20, 5 << 20).create(), + StageBuilder(9, 4003).taskRuntime(3000, 3000, 9000).shuffleRead(2 << 20, 3 << 20, 5 << 20).create(), + StageBuilder(10, 4000).taskRuntime(700, 900, 2000).create()) val properties = Map( "spark.sql.shuffle.partitions" -> "3") val data = createSparkApplicationData(stages, Seq.empty, Some(properties)) @@ -156,23 +158,38 @@ class StagesAnalyzerTest extends FunSpec with Matchers { StageAnalysisBuilder(1, 3).taskRuntime(120, 150).create(), StageAnalysisBuilder(2, 3).taskRuntime(180, 200).longTask(Severity.LOW, 0, Seq()).create(), StageAnalysisBuilder(3, 3).taskRuntime(400, 500).longTask(Severity.MODERATE, 6, - Seq("Stage 3: median task run time is 6.67 min.")).create(), + Seq("Stage 3 has a long median task run time of 6.67 min.", + "Stage 3 has 3 tasks, 0 B input, 0 B shuffle read, 0 B shuffle write, and 0 B output.")).create(), StageAnalysisBuilder(4, 3).taskRuntime(700, 900).longTask(Severity.SEVERE, 9, - Seq("Stage 4: median task run time is 11.67 min.")).create(), + Seq("Stage 4 has a long median task run time of 11.67 min.", + "Stage 4 has 3 tasks, 0 B input, 0 B shuffle read, 0 B shuffle write, and 0 B output.")).create(), StageAnalysisBuilder(5, 3).taskRuntime(1200, 1500).longTask(Severity.CRITICAL, 12, - Seq("Stage 5: median task run time is 20.00 min.")).create(), + Seq("Stage 5 has a long median task run time of 20.00 min.", + "Stage 5 has 3 tasks, 0 B input, 0 B shuffle read, 0 B shuffle write, and 0 B output.")).create(), StageAnalysisBuilder(6, 3).taskRuntime(700, 3500).longTask(Severity.SEVERE, 9, - Seq("Stage 6: median task run time is 11.67 min.")) + Seq("Stage 6 has a long median task run time of 11.67 min.", + "Stage 6 has 3 tasks, 0 B input, 0 B shuffle read, 0 B shuffle write, and 0 B output.")) .skew(Severity.MODERATE, Severity.MODERATE, 6, - Seq("Stage 6 has skew in task run time (median is 11.67 min, max is 58.33 min)", - "Stage 6: please try to modify the application to make the partitions more even.")).create(), + Seq("Stage 6 has skew in task run time (median is 11.67 min, max is 58.33 min).", + "Stage 6: please try to modify the application to make the partitions more even.")).create(), StageAnalysisBuilder(7, 2).taskRuntime(700, 900).longTask(Severity.SEVERE, 6, - Seq("Stage 7: median task run time is 11.67 min.", - "Stage 7: please increase the number of partitions, which is currently set to 2.")).create(), + Seq("Stage 7 has a long median task run time of 11.67 min.", + "Stage 7 has 2 tasks, 0 B input, 0 B shuffle read, 0 B shuffle write, and 0 B output.", + "Stage 7: please increase the number of partitions.")).create(), StageAnalysisBuilder(8, 3).taskRuntime(3000, 3000).longTask(Severity.CRITICAL, 12, - Seq("Stage 8: median task run time is 50.00 min.", - "Stage 8: please set DaliSpark.SPLIT_SIZE to a smaller value to increase the number of tasks reading input data for this stage.")) - .input(5 << 20).create()) + Seq("Stage 8 has a long median task run time of 50.00 min.", + "Stage 8 has 3 tasks, 5 TB input, 0 B shuffle read, 0 B shuffle write, and 0 B output.", + "Stage 8: please increase the number of partitions for reading data.")) + .input(5 << 20).create(), + StageAnalysisBuilder(9, 4003).taskRuntime(3000, 3000).longTask(Severity.CRITICAL, 16012, + Seq("Stage 9 has a long median task run time of 50.00 min.", + "Stage 9 has 4003 tasks, 0 B input, 5 TB shuffle read, 0 B shuffle write, and 0 B output.", + "Stage 9: please try to reduce the amount of data being processed.")) + .shuffleRead(5 << 20).create(), + StageAnalysisBuilder(10, 4000).taskRuntime(700, 900).longTask(Severity.SEVERE, 12000, + Seq("Stage 10 has a long median task run time of 11.67 min.", + "Stage 10 has 4000 tasks, 0 B input, 0 B shuffle read, 0 B shuffle write, and 0 B output.", + "Stage 10: please optimize the code to improve performance.")).create()) val stageAnalyzer = new StagesAnalyzer(heuristicConfigurationData, data) val stageAnalysis = stageAnalyzer.getStageAnalysis() @@ -222,35 +239,37 @@ class StagesAnalyzerTest extends FunSpec with Matchers { Seq("Stage 3 has 400 MB execution memory spill.")) .skew(Severity.NONE, Severity.NONE, 0, Seq("Stage 3 has skew in task input bytes (median is 500 MB, max is 1.95 GB).", - "Stage 3: please set DaliSpark.SPLIT_SIZE to make partitions more even.")) + "Stage 3: please try to modify the application to make the input partitions more even.")) .create(), StageAnalysisBuilder(4, 5).taskRuntime(300, 350).shuffleWrite(5000) .longTask(Severity.MODERATE, 10, - Seq("Stage 4: median task run time is 5.00 min.")) + Seq("Stage 4 has a long median task run time of 5.00 min.", + "Stage 4 has 5 tasks, 0 B input, 0 B shuffle read, 4.88 GB shuffle write, and 0 B output.")) .spill(Severity.SEVERE, Severity.SEVERE, 15, 350, 1500, Seq("Stage 4 has 1.46 GB execution memory spill.")).create(), StageAnalysisBuilder(5, 5).taskRuntime(300, 2500).shuffleRead(16000).shuffleWrite(3000) - .longTask(Severity.MODERATE, 10, Seq("Stage 5: median task run time is 5.00 min.")) + .longTask(Severity.MODERATE, 10, Seq("Stage 5 has a long median task run time of 5.00 min.", + "Stage 5 has 5 tasks, 0 B input, 15.62 GB shuffle read, 2.93 GB shuffle write, and 0 B output.")) .skew(Severity.SEVERE, Severity.SEVERE, 15, - Seq("Stage 5 has skew in task run time (median is 5.00 min, max is 41.67 min)", - "Stage 5 has skew in memory bytes spilled (median is 300 MB, max is 2.44 GB).", - "Stage 5 has skew in task shuffle read bytes (median is 1,000 MB, max is 4.88 GB).", - "Stage 5 has skew in task shuffle write bytes (median is 300 MB, max is 2.44 GB).", - "Stage 5: please try to modify the application to make the partitions more even.")) + Seq("Stage 5 has skew in task run time (median is 5.00 min, max is 41.67 min).", + "Stage 5 has skew in memory bytes spilled (median is 300 MB, max is 2.44 GB).", + "Stage 5 has skew in task shuffle read bytes (median is 1,000 MB, max is 4.88 GB).", + "Stage 5 has skew in task shuffle write bytes (median is 300 MB, max is 2.44 GB).", + "Stage 5: please try to modify the application to make the partitions more even.")) .spill(Severity.MODERATE, Severity.MODERATE, 10, 2500, 3000 , Seq("Stage 5 has 2.93 GB execution memory spill.")).create(), StageAnalysisBuilder(6, 3).taskRuntime(50, 250).input(350) .skew(Severity.MODERATE, Severity.MODERATE, 6, - Seq("Stage 6 has skew in task run time (median is 50.00 sec, max is 4.17 min)", + Seq("Stage 6 has skew in task run time (median is 50.00 sec, max is 4.17 min).", "Stage 6 has skew in task input bytes (median is 50 MB, max is 250 MB).", - "Stage 6: please set DaliSpark.SPLIT_SIZE to make partitions more even.")) + "Stage 6: please try to modify the application to make the input partitions more even.")) .spill(Severity.CRITICAL, Severity.CRITICAL, 12, 250, 750, Seq("Stage 6 has 750 MB execution memory spill.")).create(), StageAnalysisBuilder(7, 3).taskRuntime(50, 250).output(1500) .skew(Severity.MODERATE, Severity.MODERATE, 6, - Seq("Stage 7 has skew in task run time (median is 50.00 sec, max is 4.17 min)", - "Stage 7 has skew in task output bytes (median is 250 MB, max is 1,000 MB).", - "Stage 7: please try to modify the application to make the partitions more even.")) + Seq("Stage 7 has skew in task run time (median is 50.00 sec, max is 4.17 min).", + "Stage 7 has skew in task output bytes (median is 250 MB, max is 1,000 MB).", + "Stage 7: please try to modify the application to make the partitions more even.")) .spill(Severity.CRITICAL, Severity.CRITICAL, 12, 250, 750, Seq("Stage 7 has 750 MB execution memory spill.")).create(), StageAnalysisBuilder(8, 5).taskRuntime(2, 50).duration(60).shuffleRead(1500) @@ -260,51 +279,52 @@ class StagesAnalyzerTest extends FunSpec with Matchers { Seq("Stage 8 has 750 MB execution memory spill.")).create(), StageAnalysisBuilder(9, 5).taskRuntime(50, 250).output(6L << 20) .skew(Severity.MODERATE, Severity.MODERATE, 10, - Seq("Stage 9 has skew in task run time (median is 50.00 sec, max is 4.17 min)", + Seq("Stage 9 has skew in task run time (median is 50.00 sec, max is 4.17 min).", "Stage 9 has skew in memory bytes spilled (median is 50 MB, max is 250 MB).", "Stage 9 has skew in task output bytes (median is 50 MB, max is 250 MB).", - "Stage 9: please try to modify the application to make the partitions more even.")) + "Stage 9: please try to modify the application to make the partitions more even.") + ) .spill(Severity.SEVERE, Severity.NONE, 0, 250, 2L << 20, - Seq("Stage 9 is processing a lot of data; examine the application to see if this can be reduced.", + Seq("Stage 9: a large amount of data is being processesd. Examine the application to see if this can be reduced.", "Stage 9 has 2 TB execution memory spill.", "Stage 9 has 5 tasks, 0 B input read, 0 B shuffle read, 0 B shuffle write, 6 TB output.", "Stage 9 has median task values: 50 MB memory spill, 0 B input, 0 B shuffle read, 0 B shuffle write, 50 MB output.")) .create(), StageAnalysisBuilder(10, 5).taskRuntime(50, 250).input(6 << 20) .skew(Severity.MODERATE, Severity.MODERATE, 10, - Seq("Stage 10 has skew in task run time (median is 50.00 sec, max is 4.17 min)", + Seq("Stage 10 has skew in task run time (median is 50.00 sec, max is 4.17 min).", "Stage 10 has skew in memory bytes spilled (median is 50 MB, max is 250 MB).", "Stage 10 has skew in task input bytes (median is 50 MB, max is 250 MB).", - "Stage 10: please set DaliSpark.SPLIT_SIZE to make partitions more even.")) + "Stage 10: please try to modify the application to make the input partitions more even.")) .spill(Severity.SEVERE, Severity.NONE, 0, 250, 2L << 20, - Seq("Stage 10 is processing a lot of data; examine the application to see if this can be reduced.", + Seq("Stage 10: a large amount of data is being processesd. Examine the application to see if this can be reduced.", "Stage 10 has 2 TB execution memory spill.", "Stage 10 has 5 tasks, 6 TB input read, 0 B shuffle read, 0 B shuffle write, 0 B output.", "Stage 10 has median task values: 50 MB memory spill, 50 MB input, 0 B shuffle read, 0 B shuffle write, 0 B output.")) .create(), StageAnalysisBuilder(11, 3).taskRuntime(50, 250).input(6 << 20) .skew(Severity.MODERATE, Severity.MODERATE, 6, - Seq("Stage 11 has skew in task run time (median is 50.00 sec, max is 4.17 min)", + Seq("Stage 11 has skew in task run time (median is 50.00 sec, max is 4.17 min).", "Stage 11 has skew in memory bytes spilled (median is 50 MB, max is 250 MB).", "Stage 11 has skew in task input bytes (median is 50 MB, max is 250 MB).", - "Stage 11: please set DaliSpark.SPLIT_SIZE to make partitions more even.")) + "Stage 11: please try to modify the application to make the input partitions more even.")) .spill(Severity.CRITICAL, Severity.NONE, 0, 250, 3L << 20, - Seq("Stage 11 is processing a lot of data; examine the application to see if this can be reduced.", + Seq("Stage 11: a large amount of data is being processesd. Examine the application to see if this can be reduced.", "Stage 11 has 3 TB execution memory spill.", "Stage 11 has 3 tasks, 6 TB input read, 0 B shuffle read, 0 B shuffle write, 0 B output.", "Stage 11 has median task values: 50 MB memory spill, 50 MB input, 0 B shuffle read, 0 B shuffle write, 0 B output.")) .create(), StageAnalysisBuilder(12, 3).taskRuntime(50, 250).output(6L << 20) .skew(Severity.MODERATE, Severity.MODERATE, 6, - Seq("Stage 12 has skew in task run time (median is 50.00 sec, max is 4.17 min)", - "Stage 12 has skew in memory bytes spilled (median is 50 MB, max is 250 MB).", - "Stage 12 has skew in task output bytes (median is 50 MB, max is 250 MB).", - "Stage 12: please try to modify the application to make the partitions more even.")) + Seq("Stage 12 has skew in task run time (median is 50.00 sec, max is 4.17 min).", + "Stage 12 has skew in memory bytes spilled (median is 50 MB, max is 250 MB).", + "Stage 12 has skew in task output bytes (median is 50 MB, max is 250 MB).", + "Stage 12: please try to modify the application to make the partitions more even.")) .spill(Severity.CRITICAL, Severity.NONE, 0, 250, 4L << 20, - Seq("Stage 12 is processing a lot of data; examine the application to see if this can be reduced.", - "Stage 12 has 4 TB execution memory spill.", - "Stage 12 has 3 tasks, 0 B input read, 0 B shuffle read, 0 B shuffle write, 6 TB output.", - "Stage 12 has median task values: 50 MB memory spill, 0 B input, 0 B shuffle read, 0 B shuffle write, 50 MB output.")) + Seq("Stage 12: a large amount of data is being processesd. Examine the application to see if this can be reduced.", + "Stage 12 has 4 TB execution memory spill.", + "Stage 12 has 3 tasks, 0 B input read, 0 B shuffle read, 0 B shuffle write, 6 TB output.", + "Stage 12 has median task values: 50 MB memory spill, 0 B input, 0 B shuffle read, 0 B shuffle write, 50 MB output.")) .create()) val stageAnalyzer = new StagesAnalyzer(heuristicConfigurationData, data) @@ -315,6 +335,71 @@ class StagesAnalyzerTest extends FunSpec with Matchers { } } + it("custom recommendations") { + val heuristicConfigurationData = createHeuristicConfigurationData( + Map("execution_memory_spill_large_data_recommendation" -> "please try to filter the data to reduce the size", + "task_skew_input_data_recommendation" -> "please set DaliSpark.SPLIT_SIZE to make partitions more even", + "task_skew_generic_recommendation" -> "please make the partitions more even", + "long_tasks_large_data_recommendation" -> "please try to filter the data to reduce the size and increase speed", + "slow_tasks_recommendation" -> "optimize the code to increase speed", + "long tasks_few_partitions" -> "increase the number of partitions to speed up the stage", + "long tasks_few_input_partitions" -> "please set DaliSpark.SPLIT_SIZE to make partitions more even")) + val stages = Seq( + StageBuilder(1, 4003).taskRuntime(3000, 3000, 9000).shuffleRead(2 << 20, 3 << 20, 5 << 20).create(), + StageBuilder(2, 4000).taskRuntime(700, 900, 2000).create(), + StageBuilder(3, 2).taskRuntime(700, 900, 2000).create(), + StageBuilder(4, 3).taskRuntime(3000, 3000, 9000).input(2 << 20, 3 << 20, 5 << 20).create(), + StageBuilder(5, 3).taskRuntime(5, 250, 70).create(), + StageBuilder(6, 3).taskRuntime(20, 250, 300).input(20, 250, 300).create(), + StageBuilder(9, 5).taskRuntime(50, 50, 350).output(250, 250, 6L << 20) + .spill(250, 250, 2L << 20).create()) + val properties = Map( "spark.sql.shuffle.partitions" -> "3") + val data = createSparkApplicationData(stages, Seq.empty, Some(properties)) + + val expectedAnalysis = Seq( + StageAnalysisBuilder(1, 4003).taskRuntime(3000, 3000).longTask(Severity.CRITICAL, 16012, + Seq("Stage 1 has a long median task run time of 50.00 min.", + "Stage 1 has 4003 tasks, 0 B input, 5 TB shuffle read, 0 B shuffle write, and 0 B output.", + "Stage 1: please try to filter the data to reduce the size and increase speed.")) + .shuffleRead(5 << 20).create(), + StageAnalysisBuilder(2, 4000).taskRuntime(700, 900).longTask(Severity.SEVERE, 12000, + Seq("Stage 2 has a long median task run time of 11.67 min.", + "Stage 2 has 4000 tasks, 0 B input, 0 B shuffle read, 0 B shuffle write, and 0 B output.", + "Stage 2: optimize the code to increase speed.")).create(), + StageAnalysisBuilder(3, 2).taskRuntime(700, 900).longTask(Severity.SEVERE, 6, + Seq("Stage 3 has a long median task run time of 11.67 min.", + "Stage 3 has 2 tasks, 0 B input, 0 B shuffle read, 0 B shuffle write, and 0 B output.", + "Stage 3: increase the number of partitions to speed up the stage.")).create(), + StageAnalysisBuilder(4, 3).taskRuntime(3000, 3000).longTask(Severity.CRITICAL, 12, + Seq("Stage 4 has a long median task run time of 50.00 min.", + "Stage 4 has 3 tasks, 5 TB input, 0 B shuffle read, 0 B shuffle write, and 0 B output.", + "Stage 4: please set DaliSpark.SPLIT_SIZE to make partitions more even.")) + .input(5 << 20).create(), + StageAnalysisBuilder(5, 3).taskRuntime(5, 250) + .skew(Severity.CRITICAL, Severity.CRITICAL, 12, + Seq("Stage 5 has skew in task run time (median is 5.00 sec, max is 4.17 min).", + "Stage 5: please make the partitions more even.") + ).create(), + StageAnalysisBuilder(6, 3).taskRuntime(20, 250).input(300) + .skew(Severity.SEVERE, Severity.SEVERE, 9, + Seq("Stage 6 has skew in task run time (median is 20.00 sec, max is 4.17 min).", + "Stage 6 has skew in task input bytes (median is 20 MB, max is 250 MB).", + "Stage 6: please set DaliSpark.SPLIT_SIZE to make partitions more even.")).create(), + StageAnalysisBuilder(7, 5).taskRuntime(50, 50).output(6L << 20) + .spill(Severity.SEVERE, Severity.NONE, 0, 250, 2L << 20, + Seq("Stage 9: please try to filter the data to reduce the size.", + "Stage 9 has 2 TB execution memory spill.", + "Stage 9 has 5 tasks, 0 B input read, 0 B shuffle read, 0 B shuffle write, 6 TB output.", + "Stage 9 has median task values: 250 MB memory spill, 0 B input, 0 B shuffle read, 0 B shuffle write, 250 MB output.")) + .create()) + + val stageAnalyzer = new StagesAnalyzer(heuristicConfigurationData, data) + val stageAnalysis = stageAnalyzer.getStageAnalysis() + (0 until expectedAnalysis.size).foreach { i => + compareStageAnalysis(stageAnalysis(i), expectedAnalysis(i)) + } + } + /** compare actual and expected StageAnalysis */ private def compareStageAnalysis(actual: StageAnalysis, expected: StageAnalysis): Unit = { compareExecutionMemorySpillResult(actual.executionMemorySpillResult, expected.executionMemorySpillResult) @@ -331,7 +416,6 @@ class StagesAnalyzerTest extends FunSpec with Matchers { actual.outputBytes should be(expected.outputBytes) actual.shuffleReadBytes should be(expected.shuffleReadBytes) actual.shuffleWriteBytes should be(expected.shuffleWriteBytes) - } /** compare actual and expected ExecutionMemorySpillResult */