Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIHADOOP-39635: Add new configuration parameters heuristic #463

Merged
merged 10 commits into from
Dec 21, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions app-conf/HeuristicConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -422,4 +422,26 @@
</params>-->
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Configuration Parameters Metrics</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.ConfigurationParametersHeuristic</classname>
<viewname>views.html.help.spark.helpConfigurationParametersHeuristic</viewname>
<!--<params>
<executor_peak_jvm_memory_threshold>1.25,1.5,2,3</executor_peak_jvm_memory_threshold>
<driver_peak_jvm_memory_threshold>1.25,1.5,2,3</driver_peak_jvm_memory_threshold>
<gc_severity_threshold>0.08,0.09,0.1,0.15</gc_severity_threshold>
<spark_executor_memory_threshold_key>2G</spark_executor_memory_threshold_key>
<spark_stage_execution_memory_spill_threshold>0.01,0.1,0.25,0.5</spark_stage_execution_memory_spill_threshold>
<spark_stage_task_skew_threshold>2,4,8,16</spark_stage_task_skew_threshold>
<spark_stage_task_duration_threshold>150000,300000,600000,900000</spark_stage_task_duration_threshold>
<execution_memory_spill_max_data_threshold>3T</execution_memory_spill_max_data_threshold>
<stage_task_failure_rate_severity_threshold>0.05, 0.1,0.15,0.2</stage_task_failure_rate_severity_threshold>
<task_skew_task_to_stage_duration_ratio>0.75</task_skew_task_to_stage_duration_ratio>
<task_skew_task_duration_threshold>150000</task_skew_task_duration_threshold>
<max_recommended_partitions>4000</max_recommended_partitions>
</params>-->
</heuristic>


</heuristics>
11 changes: 9 additions & 2 deletions app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,20 @@ class SparkRestClient(sparkConf: SparkConf) {
getLogData(attemptTarget)
}
} else Future.successful(None)
val futureFailedTasks = if (fetchFailedTasks) {
Future {
getStagesWithFailedTasks(attemptTarget)
}
} else {
Future.successful(Seq.empty)
}

SparkRestDerivedData(
applicationInfo,
Await.result(futureJobDatas, DEFAULT_TIMEOUT),
Await.result(futureStageDatas, DEFAULT_TIMEOUT),
Await.result(futureExecutorSummaries, Duration(5, SECONDS)),
Seq.empty,
Await.result(futureFailedTasks, DEFAULT_TIMEOUT),
Await.result(futureLogData, Duration(5, SECONDS))
)

Expand Down Expand Up @@ -211,7 +218,7 @@ class SparkRestClient(sparkConf: SparkConf) {
}

private def getStageDatas(attemptTarget: WebTarget): Seq[StageDataImpl] = {
val target = attemptTarget.path("stages")
val target = attemptTarget.path("stages/withSummaries")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is LinkedIn specific REST endpoint and wont work in open source till it's contributed back to Spark upstream. Probably going ahead we should refactor the code and have our own SparkRestClient implementation. The abstraction for us is primarily at the fetcher level. So probably have a linkedin specific spark fetcher implementation which extends SparkFetcher which currently exists, reuses the part where we are fetching event logs but has custom Spark rest client implementation.

Copy link
Author

@edwinalu edwinalu Dec 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is LinkedIn specific, and separating out the code would make sense. Could the refactoring be done later?

try {
get(target, SparkRestObjectMapper.readValue[Seq[StageDataImpl]])
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,17 @@ trait StageData{
def status: StageStatus
def stageId: Int
def attemptId: Int
def numTasks: Int
def numActiveTasks: Int
def numCompleteTasks: Int
def numFailedTasks: Int

def executorRunTime: Long
def executorCpuTime: Long
def submissionTime: Option[Date]
def firstTaskLaunchedTime: Option[Date]
def completionTime: Option[Date]
def failureReason: Option[String]

def inputBytes: Long
def inputRecords: Long
Expand All @@ -166,7 +172,14 @@ trait StageData{

def accumulatorUpdates: Seq[AccumulableInfo]
def tasks: Option[Map[Long, TaskDataImpl]]
def executorSummary: Option[Map[String, ExecutorStageSummary]]}
def executorSummary: Option[Map[String, ExecutorStageSummary]]

def peakJvmUsedMemory: Option[Long]
def peakExecutionMemory: Option[Long]
def peakStorageMemory: Option[Long]
def peakUnifiedMemory: Option[Long]
def taskSummary : Option[TaskMetricDistributions]
def executorMetricsSummary : Option[ExecutorMetricDistributions]}

trait TaskData{
def taskId: Long
Expand Down Expand Up @@ -219,10 +232,15 @@ trait TaskMetricDistributions{
def quantiles: IndexedSeq[Double]

def executorDeserializeTime: IndexedSeq[Double]
def executorDeserializeCpuTime: IndexedSeq[Double]
def executorRunTime: IndexedSeq[Double]
def executorCpuTime: IndexedSeq[Double]
def resultSize: IndexedSeq[Double]
def jvmGcTime: IndexedSeq[Double]
def resultSerializationTime: IndexedSeq[Double]
def gettingResultTime: IndexedSeq[Double]
def schedulerDelay: IndexedSeq[Double]
def peakExecutionMemory: IndexedSeq[Double]
def memoryBytesSpilled: IndexedSeq[Double]
def diskBytesSpilled: IndexedSeq[Double]

Expand All @@ -246,13 +264,33 @@ trait ShuffleReadMetricDistributions{
def localBlocksFetched: IndexedSeq[Double]
def fetchWaitTime: IndexedSeq[Double]
def remoteBytesRead: IndexedSeq[Double]
def remoteBytesReadToDisk: IndexedSeq[Double]
def totalBlocksFetched: IndexedSeq[Double]}

trait ShuffleWriteMetricDistributions{
def writeBytes: IndexedSeq[Double]
def writeRecords: IndexedSeq[Double]
def writeTime: IndexedSeq[Double]}

trait ExecutorMetricDistributions{
def quantiles: IndexedSeq[Double]
def numTasks: IndexedSeq[Double]
def inputBytes : IndexedSeq[Double]
def inputRecords : IndexedSeq[Double]
def outputBytes : IndexedSeq[Double]
def outputRecords : IndexedSeq[Double]
def shuffleRead : IndexedSeq[Double]
def shuffleReadRecords : IndexedSeq[Double]
def shuffleWrite : IndexedSeq[Double]
def shuffleWriteRecords : IndexedSeq[Double]
def memoryBytesSpilled : IndexedSeq[Double]
def diskBytesSpilled : IndexedSeq[Double]
def peakJvmUsedMemory : IndexedSeq[Double]
def peakExecutionMemory : IndexedSeq[Double]
def peakStorageMemory : IndexedSeq[Double]
def peakUnifiedMemory : IndexedSeq[Double]}


trait AccumulableInfo{
def id: Long
def name: String
Expand Down Expand Up @@ -353,11 +391,17 @@ class StageDataImpl(
var status: StageStatus,
var stageId: Int,
var attemptId: Int,
var numTasks: Int,
var numActiveTasks: Int ,
var numCompleteTasks: Int,
var numFailedTasks: Int,

var executorRunTime: Long,
var executorCpuTime: Long,
var submissionTime: Option[Date],
var firstTaskLaunchedTime: Option[Date],
var completionTime: Option[Date],
var failureReason: Option[String],

var inputBytes: Long,
var inputRecords: Long,
Expand All @@ -376,7 +420,13 @@ class StageDataImpl(

var accumulatorUpdates: Seq[AccumulableInfoImpl],
var tasks: Option[Map[Long, TaskDataImpl]],
var executorSummary: Option[Map[String, ExecutorStageSummaryImpl]]) extends StageData
var executorSummary: Option[Map[String, ExecutorStageSummaryImpl]],
var peakJvmUsedMemory: Option[Long],
var peakExecutionMemory: Option[Long],
var peakStorageMemory: Option[Long],
var peakUnifiedMemory: Option[Long],
var taskSummary : Option[TaskMetricDistributionsImpl],
var executorMetricsSummary : Option[ExecutorMetricDistributionsImpl]) extends StageData

class TaskDataImpl(
var taskId: Long,
Expand Down Expand Up @@ -427,12 +477,16 @@ class ShuffleWriteMetricsImpl(

class TaskMetricDistributionsImpl(
var quantiles: IndexedSeq[Double],

var executorDeserializeTime: IndexedSeq[Double],
var executorDeserializeCpuTime: IndexedSeq[Double],
var executorRunTime: IndexedSeq[Double],
var executorCpuTime: IndexedSeq[Double],
var resultSize: IndexedSeq[Double],
var jvmGcTime: IndexedSeq[Double],
var resultSerializationTime: IndexedSeq[Double],
var gettingResultTime: IndexedSeq[Double],
var schedulerDelay: IndexedSeq[Double],
var peakExecutionMemory: IndexedSeq[Double],
var memoryBytesSpilled: IndexedSeq[Double],
var diskBytesSpilled: IndexedSeq[Double],

Expand All @@ -456,6 +510,7 @@ class ShuffleReadMetricDistributionsImpl(
var localBlocksFetched: IndexedSeq[Double],
var fetchWaitTime: IndexedSeq[Double],
var remoteBytesRead: IndexedSeq[Double],
var remoteBytesReadToDisk: IndexedSeq[Double],
var totalBlocksFetched: IndexedSeq[Double]) extends ShuffleReadMetricDistributions

class ShuffleWriteMetricDistributionsImpl(
Expand All @@ -468,3 +523,21 @@ class AccumulableInfoImpl(
var name: String,
var update: Option[String],
var value: String) extends AccumulableInfo

class ExecutorMetricDistributionsImpl(
var quantiles: IndexedSeq[Double],
var numTasks: IndexedSeq[Double],
var inputBytes : IndexedSeq[Double],
var inputRecords : IndexedSeq[Double],
var outputBytes : IndexedSeq[Double],
var outputRecords : IndexedSeq[Double],
var shuffleRead : IndexedSeq[Double],
var shuffleReadRecords : IndexedSeq[Double],
var shuffleWrite : IndexedSeq[Double],
var shuffleWriteRecords : IndexedSeq[Double],
var memoryBytesSpilled : IndexedSeq[Double],
var diskBytesSpilled : IndexedSeq[Double],
var peakJvmUsedMemory : IndexedSeq[Double],
var peakExecutionMemory : IndexedSeq[Double],
var peakStorageMemory : IndexedSeq[Double],
var peakUnifiedMemory : IndexedSeq[Double]) extends ExecutorMetricDistributions
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.linkedin.drelephant.spark.heuristics
varunsaxena marked this conversation as resolved.
Show resolved Hide resolved

/**
* Adjustments to configuration parameters for fixing flagged issues.
*/
private[heuristics] sealed trait ConfigurationParameterAdjustment[T] {

/**
* Determine if the value should be adjusted.
*
* @param value the value to adjust.
* @return true if the value should be adjusted, false otherwise.
*/
def canAdjust(value: T): Boolean

/** Adjust the value.
*
* @param value the value to adjust.
* @return the adjusted recommended value.
*/
def adjust(value: T): T
}

/** If the number of cores is greater than the threshold, then divide by divisor. */
private[heuristics] case class CoreDivisorAdjustment(
threshold: Int,
divisor: Double) extends ConfigurationParameterAdjustment[Int] {
override def canAdjust(numCores: Int): Boolean = (numCores > threshold)
override def adjust(numCores: Int): Int = Math.ceil(numCores / divisor).toInt
}

/** Set the number of cores to threshold, if the number of cores is greater. */
private[heuristics] case class CoreSetAdjustment(
threshold: Int) extends ConfigurationParameterAdjustment[Int] {
override def canAdjust(numCores: Int): Boolean = (numCores > threshold)
override def adjust(numCores: Int): Int = threshold
}

/** If the memory is less than the threshold, then multiply by multiplier. */
private[heuristics] case class MemoryMultiplierAdjustment(
threshold: Long,
multiplier: Double) extends ConfigurationParameterAdjustment[Long] {
override def canAdjust(memBytes: Long): Boolean = (memBytes < threshold)
override def adjust(memBytes: Long): Long = (memBytes * multiplier).toLong
}

/** If the memory is less than the threshold, then set to the theshold. */
private[heuristics] case class MemorySetAdjustment(
threshold: Long) extends ConfigurationParameterAdjustment[Long] {
override def canAdjust(memBytes: Long): Boolean = (memBytes < threshold)
override def adjust(memBytes: Long): Long = threshold
}

/** If the number of partitions is less than the threshold, then multiply by multiplier. */
private[heuristics] case class PartitionMultiplierAdjustment(
threshold: Int,
multiplier: Double) extends ConfigurationParameterAdjustment[Int] {
override def canAdjust(numPartitions: Int): Boolean = (numPartitions < threshold)
override def adjust(numPartitions: Int): Int = (numPartitions * multiplier).toInt
}

/** If the number of partitions is less than the threshold, then set to threshold. */
private[heuristics] case class PartitionSetAdjustment(
threshold: Int) extends ConfigurationParameterAdjustment[Int] {
override def canAdjust(numPartitions: Int): Boolean = (numPartitions < threshold)
override def adjust(numPartitions: Int): Int = threshold
}
Loading