Skip to content

Commit

Permalink
Fix calculation of Spark heuristic score #423
Browse files Browse the repository at this point in the history
  • Loading branch information
ShubhamGupta29 authored and varunsaxena committed Oct 16, 2018
1 parent 4c365fa commit ffe4d17
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.util.Try
import com.linkedin.drelephant.analysis._
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.util.MemoryFormatUtils
import com.linkedin.drelephant.util.{MemoryFormatUtils, Utils}
import com.linkedin.drelephant.math.Statistics

/**
Expand Down Expand Up @@ -94,7 +94,7 @@ class ConfigurationHeuristic(private val heuristicConfigurationData: HeuristicCo
heuristicConfigurationData.getClassName,
heuristicConfigurationData.getHeuristicName,
evaluator.severity,
0,
evaluator.score,
mutableResultDetailsArrayList
)
if (evaluator.serializerSeverity != Severity.NONE) {
Expand Down Expand Up @@ -241,6 +241,13 @@ object ConfigurationHeuristic {

lazy val severity: Severity = Severity.max(serializerSeverity, shuffleAndDynamicAllocationSeverity, severityConfThresholds)

if (data.executorSummaries == null) {
throw new Exception("Executor Summary is Null.")
}
val executorCount = data.executorSummaries.size

lazy val score = Utils.getHeuristicScore(severity, executorCount)

private val serializerIfNonNullRecommendation: String = configurationHeuristic.serializerIfNonNullRecommendation

private def getProperty(key: String): Option[String] = appConfigurationProperties.get(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.util.Try
import com.linkedin.drelephant.analysis._
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.util.MemoryFormatUtils
import com.linkedin.drelephant.util.{MemoryFormatUtils, Utils}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary

/**
Expand Down Expand Up @@ -98,7 +98,7 @@ class DriverHeuristic(private val heuristicConfigurationData: HeuristicConfigura
heuristicConfigurationData.getClassName,
heuristicConfigurationData.getHeuristicName,
evaluator.severity,
0,
evaluator.score,
mutableResultDetailsArrayList
)
result
Expand Down Expand Up @@ -187,6 +187,10 @@ object DriverHeuristic {
//Severity for the configuration thresholds
val severityConfThresholds: Severity = Severity.max(severityDriverCores, severityDriverMemory, severityDriverMemoryOverhead)
lazy val severity: Severity = Severity.max(severityConfThresholds, severityGc, severityJvmUsedMemory)

val executorCount = 1 //For driver number of executor is 1
lazy val score = Utils.getHeuristicScore(severity, executorCount)

private def getProperty(key: String): Option[String] = appConfigurationProperties.get(key)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.linkedin.drelephant.analysis._
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.math.Statistics

import com.linkedin.drelephant.util.Utils

import scala.collection.JavaConverters

Expand Down Expand Up @@ -66,7 +66,7 @@ class ExecutorGcHeuristic(private val heuristicConfigurationData: HeuristicConfi
heuristicConfigurationData.getClassName,
heuristicConfigurationData.getHeuristicName,
evaluator.severityTimeA,
0,
evaluator.score,
resultDetails.asJava
)
result
Expand Down Expand Up @@ -116,6 +116,9 @@ object ExecutorGcHeuristic {

lazy val severityTimeD: Severity = executorGcHeuristic.gcSeverityDThresholds.severityOf(ratio)

val executorCount = executorSummaries.size
lazy val score = Utils.getHeuristicScore(severityTimeA, executorCount)

/**
* returns the total JVM GC Time and total executor Run Time across all stages
* @param executorSummaries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ExecutorStageSummary,
import com.linkedin.drelephant.analysis._
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.util.MemoryFormatUtils
import com.linkedin.drelephant.util.{MemoryFormatUtils, Utils}

import scala.collection.JavaConverters

Expand Down Expand Up @@ -76,7 +76,7 @@ class ExecutorStorageSpillHeuristic(private val heuristicConfigurationData: Heur
heuristicConfigurationData.getClassName,
heuristicConfigurationData.getHeuristicName,
evaluator.severity,
0,
evaluator.score,
resultDetails.asJava
)
result
Expand Down Expand Up @@ -132,6 +132,10 @@ object ExecutorStorageSpillHeuristic {
else Severity.NONE
}

val executorCount = executorSummaries.size
lazy val score = Utils.getHeuristicScore(severity, executorCount)


lazy val sparkExecutorMemory: Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0)
lazy val sparkExecutorCores: Int = (appConfigurationProperties.get(SPARK_EXECUTOR_CORES).map(_.toInt)).getOrElse(0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.linkedin.drelephant.analysis._
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary
import com.linkedin.drelephant.util.MemoryFormatUtils
import com.linkedin.drelephant.util.{MemoryFormatUtils, Utils}

import scala.collection.JavaConverters

Expand Down Expand Up @@ -57,7 +57,7 @@ class JvmUsedMemoryHeuristic(private val heuristicConfigurationData: HeuristicCo
heuristicConfigurationData.getClassName,
heuristicConfigurationData.getHeuristicName,
evaluator.severity,
0,
evaluator.score,
resultDetails.asJava
)
result
Expand All @@ -79,6 +79,10 @@ object JvmUsedMemoryHeuristic {
lazy val appConfigurationProperties: Map[String, String] =
data.appConfigurationProperties

if (data.executorSummaries == null) {
throw new Exception("Executor Summary is Null.")
}

lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries
val executorList: Seq[ExecutorSummary] = executorSummaries.filterNot(_.id.equals("driver"))
val sparkExecutorMemory: Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0L)
Expand All @@ -100,6 +104,10 @@ object JvmUsedMemoryHeuristic {
} else {
MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS.severityOf(sparkExecutorMemory)
}

val executorCount = executorList.size
lazy val score = Utils.getHeuristicScore(severity, executorCount)

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationDa
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{StageData, TaskData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus
import com.linkedin.drelephant.util.Utils

import scala.collection.JavaConverters

Expand Down Expand Up @@ -51,7 +52,7 @@ class StagesWithFailedTasksHeuristic(private val heuristicConfigurationData: Heu
heuristicConfigurationData.getClassName,
heuristicConfigurationData.getHeuristicName,
evaluator.severity,
0,
evaluator.score,
resultDetails.asJava
)
result
Expand Down Expand Up @@ -142,6 +143,14 @@ object StagesWithFailedTasksHeuristic {

lazy val (severityOOMStages: Severity, severityOverheadStages: Severity, stagesWithOOMError: Int, stagesWithOverheadError: Int) = getErrorsSeverity
lazy val severity: Severity = Severity.max(severityOverheadStages, severityOOMStages)

if (data.executorSummaries == null) {
throw new Exception("Executor Summary is Null.")
}

val executorCount = data.executorSummaries.filterNot(_.id.equals("driver")).size
lazy val score = Utils.getHeuristicScore(severity, executorCount)

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.linkedin.drelephant.analysis._
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary
import com.linkedin.drelephant.util.MemoryFormatUtils
import com.linkedin.drelephant.util.{MemoryFormatUtils, Utils}

import scala.collection.JavaConverters

Expand Down Expand Up @@ -56,7 +56,7 @@ class UnifiedMemoryHeuristic(private val heuristicConfigurationData: HeuristicCo
heuristicConfigurationData.getClassName,
heuristicConfigurationData.getHeuristicName,
evaluator.severity,
0,
evaluator.score,
resultDetails.asJava
)
result
Expand Down Expand Up @@ -128,5 +128,9 @@ object UnifiedMemoryHeuristic {
} else {
Severity.NONE
}

val executorCount = executorList.size
lazy val score = Utils.getHeuristicScore(severity, executorCount)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class DriverHeuristicTest extends FunSpec with Matchers {
heuristicResult.getSeverity should be(Severity.SEVERE)
}

it("has score") {
heuristicResult.getScore should be(Severity.SEVERE.getValue * 1)
}

describe("Evaluator") {
val evaluator = new Evaluator(driverHeuristic, data)
it("has max driver peak JVM memory") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,19 @@ class ExecutorGcHeuristicTest extends FunSpec with Matchers {
heuristicResult.getSeverity should be(Severity.CRITICAL)
}

it("returns non-zero score") {
heuristicResult.getScore should be(Severity.CRITICAL.getValue * data.executorSummaries
.filterNot(_.id.equals("driver")).size)
}

it("return the low severity") {
heuristicResult2.getSeverity should be(Severity.LOW)
}

it("return the 0 score") {
heuristicResult2.getScore should be(0)
}

it("return NONE severity for runtime less than 5 min") {
heuristicResult2.getSeverity should be(Severity.LOW)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ class ExecutorStorageSpillHeuristicTest extends FunSpec with Matchers {
heuristicResult.getSeverity should be(Severity.SEVERE)
}

it("returns the score") {
heuristicResult.getScore should be(Severity.SEVERE.getValue * data1.executorSummaries.size)
}

it("returns the total memory spilled") {
val details = heuristicResultDetails.get(0)
details.getName should include("Total memory spilled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ class JvmUsedMemoryHeuristicTest extends FunSpec with Matchers {
evaluator.severity should be(Severity.CRITICAL)
}

it("has non-zero score") {
evaluator.score should be(Severity.CRITICAL.getValue * evaluator.executorList.size)
}

it("has max peak jvm memory") {
evaluator.maxExecutorPeakJvmUsedMemory should be (394567123)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ class UnifiedMemoryHeuristicTest extends FunSpec with Matchers {
heuristicResult.getSeverity should be(Severity.CRITICAL)
}

it("has non-zero score") {
heuristicResult.getScore should be(Severity.CRITICAL.getValue * data.executorSummaries
.filterNot(_.id.equals("driver")).size)
}

it("has max value") {
val details = heuristicResult.getHeuristicResultDetails.get(2)
details.getName should be("Max peak unified memory")
Expand All @@ -85,6 +90,11 @@ class UnifiedMemoryHeuristicTest extends FunSpec with Matchers {
heuristicResult1.getSeverity should be(Severity.CRITICAL)
}

it("data1 has non - zero score") {
heuristicResult1.getScore should be(Severity.CRITICAL.getValue * data1.executorSummaries
.filterNot(_.id.equals("driver")).size)
}

it("data1 has maxMemory") {
evaluator.maxMemory should be(999999999)
}
Expand Down

0 comments on commit ffe4d17

Please sign in to comment.