Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Executor time duration change #440

Open
wants to merge 16 commits into
base: customSHSWork
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions app-conf/FetcherConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,16 @@
To work properly, this fetcher should use the same timezone with the job history server.
If not set, the local timezone will be used.
-->

<fetcher>
<applicationtype>mapreduce</applicationtype>
<classname>com.linkedin.drelephant.mapreduce.fetchers.MapReduceFSFetcherHadoop2</classname>
<params>
<sampling_enabled>false</sampling_enabled>
<history_log_size_limit_in_mb>500</history_log_size_limit_in_mb>
<history_server_time_zone>PST</history_server_time_zone>
<sampling_enabled>false</sampling_enabled>
<history_server_time_zone>UTC</history_server_time_zone>
</params>
</fetcher>


<!--
FSFetcher for Spark. Loads the eventlog from HDFS and replays to get the metrics and application properties
Expand All @@ -88,7 +87,10 @@
-->
<fetcher>
<applicationtype>spark</applicationtype>
<classname>com.linkedin.drelephant.spark.fetchers.FSFetcher</classname>
<classname>com.linkedin.drelephant.spark.fetchers.SparkFetcher</classname>
<params>
<fetch_failed_tasks>true</fetch_failed_tasks>
</params>
</fetcher>

<!--
Expand Down
66 changes: 64 additions & 2 deletions app-conf/HeuristicConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -292,29 +292,91 @@
<classname>com.linkedin.drelephant.spark.heuristics.ConfigurationHeuristic</classname>
<viewname>views.html.help.spark.helpConfigurationHeuristic</viewname>
</heuristic>
<heuristic>

<!--<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Spark Executor Metrics</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.ExecutorsHeuristic</classname>
<viewname>views.html.help.spark.helpExecutorsHeuristic</viewname>
</heuristic>
</heuristic>-->

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Spark Job Metrics</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.JobsHeuristic</classname>
<viewname>views.html.help.spark.helpJobsHeuristic</viewname>
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Spark Stage Metrics</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.StagesHeuristic</classname>
<viewname>views.html.help.spark.helpStagesHeuristic</viewname>
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Executor Peak Unified Memory</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.UnifiedMemoryHeuristic</classname>
<viewname>views.html.help.spark.helpUnifiedMemoryHeuristic</viewname>
<!--<params>
<peak_unified_memory_threshold>0.7,0.6,0.4,0.2</peak_unified_memory_threshold>
<spark_executor_memory_threshold_key>2G</spark_executor_memory_threshold_key>
</params>-->
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Executor JVM Used Memory</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.JvmUsedMemoryHeuristic</classname>
<viewname>views.html.help.spark.helpJvmUsedMemoryHeuristic</viewname>
<!--<params>
<executor_peak_jvm_memory_threshold>1.25,1.5,2,3</executor_peak_jvm_memory_threshold>
<spark_executor_memory_threshold_key>2G</spark_executor_memory_threshold_key>
</params>-->
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Stages with failed tasks</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.StagesWithFailedTasksHeuristic</classname>
<viewname>views.html.help.spark.helpStagesWithFailedTasks</viewname>
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Executor GC</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.ExecutorGcHeuristic</classname>
<viewname>views.html.help.spark.helpExecutorGcHeuristic</viewname>
<!--<params>
<gc_severity_A_threshold>0.08,0.09,0.1,0.15</gc_severity_A_threshold>
<gc_severity_D_threshold>0.05,0.04,0.03,0.01</gc_severity_D_threshold>
</params>-->
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Executor spill</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.ExecutorStorageSpillHeuristic</classname>
<viewname>views.html.help.spark.helpExecutorStorageSpillHeuristic</viewname>
<!--<params>
<spill_fraction_of_executors_threshold>0.2</spill_fraction_of_executors_threshold>
<spill_max_memory_threshold>0.05</spill_max_memory_threshold>
<spark_executor_cores_threshold>4</spark_executor_cores_threshold>
<spark_executor_memory_threshold>10GB</spark_executor_memory_threshold>
</params>-->
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Driver Metrics</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.DriverHeuristic</classname>
<viewname>views.html.help.spark.helpDriverHeuristic</viewname>
<!--<params>
<driver_peak_jvm_memory_threshold>1.25,1.5,2,3</driver_peak_jvm_memory_threshold>
<gc_severity_threshold>0.08,0.09,0.1,0.15</gc_severity_threshold>
<spark_executor_memory_threshold_key>2G</spark_executor_memory_threshold_key>
</params>-->
</heuristic>

</heuristics>
1 change: 0 additions & 1 deletion app/com/linkedin/drelephant/ElephantContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ private void loadConfiguration() {
loadFetchers();
loadHeuristics();
loadJobTypes();

loadGeneralConf();
loadAutoTuningConf();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/
public class HadoopAggregatedData {

//variable resourceUsed is actually the resource blocked on the cluster.
private long resourceUsed = 0;
private long resourceWasted = 0;
private long totalDelay = 0;
Expand Down
40 changes: 23 additions & 17 deletions app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package com.linkedin.drelephant.spark
import com.linkedin.drelephant.analysis.{HadoopAggregatedData, HadoopApplicationData, HadoopMetricsAggregator}
import com.linkedin.drelephant.configurations.aggregator.AggregatorConfigurationData
import com.linkedin.drelephant.math.Statistics
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.data.{SparkApplicationData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary
import com.linkedin.drelephant.util.MemoryFormatUtils
import org.apache.commons.io.FileUtils
import org.apache.log4j.Logger

import scala.util.Try


Expand All @@ -47,19 +49,13 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
}

private def aggregate(data: SparkApplicationData): Unit = for {
executorInstances <- executorInstancesOf(data)
executorMemoryBytes <- executorMemoryBytesOf(data)
} {
val applicationDurationMillis = applicationDurationMillisOf(data)
if( applicationDurationMillis < 0) {
logger.warn(s"applicationDurationMillis is negative. Skipping Metrics Aggregation:${applicationDurationMillis}")
} else {
val totalExecutorTaskTimeMillis = totalExecutorTaskTimeMillisOf(data)

val resourcesAllocatedForUse =
aggregateresourcesAllocatedForUse(executorInstances, executorMemoryBytes, applicationDurationMillis)
val resourcesActuallyUsed = aggregateresourcesActuallyUsed(executorMemoryBytes, totalExecutorTaskTimeMillis)

} else {
var (resourcesActuallyUsed, resourcesAllocatedForUse) = calculateResourceUsage(data.executorSummaries, executorMemoryBytes)
val resourcesActuallyUsedWithBuffer = resourcesActuallyUsed.doubleValue() * (1.0 + allocatedMemoryWasteBufferPercentage)
val resourcesWastedMBSeconds = (resourcesActuallyUsedWithBuffer < resourcesAllocatedForUse.doubleValue()) match {
case true => resourcesAllocatedForUse.doubleValue() - resourcesActuallyUsedWithBuffer
Expand All @@ -71,10 +67,8 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
} else {
logger.warn(s"resourcesAllocatedForUse/resourcesWasted exceeds Long.MaxValue")
logger.warn(s"ResourceUsed: ${resourcesAllocatedForUse}")
logger.warn(s"executorInstances: ${executorInstances}")
logger.warn(s"executorMemoryBytes:${executorMemoryBytes}")
logger.warn(s"applicationDurationMillis:${applicationDurationMillis}")
logger.warn(s"totalExecutorTaskTimeMillis:${totalExecutorTaskTimeMillis}")
logger.warn(s"resourcesActuallyUsedWithBuffer:${resourcesActuallyUsedWithBuffer}")
logger.warn(s"resourcesWastedMBSeconds:${resourcesWastedMBSeconds}")
logger.warn(s"allocatedMemoryWasteBufferPercentage:${allocatedMemoryWasteBufferPercentage}")
Expand All @@ -83,16 +77,28 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
}
}

private def aggregateresourcesActuallyUsed(executorMemoryBytes: Long, totalExecutorTaskTimeMillis: BigInt): BigInt = {
val bytesMillis = BigInt(executorMemoryBytes) * totalExecutorTaskTimeMillis
(bytesMillis / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS)))
//calculates the resource usage by summing up the resources used per executor
private def calculateResourceUsage(executorSummaries: Seq[ExecutorSummary], executorMemoryBytes: Long): (BigInt, BigInt) = {
var sumResourceUsage: BigInt = 0
var sumResourcesAllocatedForUse : BigInt = 0
executorSummaries.foreach(
executorSummary => {
var memUsedBytes: Long = executorSummary.peakJvmUsedMemory.getOrElse(JVM_USED_MEMORY, 0).asInstanceOf[Number].longValue + MemoryFormatUtils.stringToBytes(SPARK_RESERVED_MEMORY)
var timeSpent: Long = executorSummary.totalDuration
val bytesMillisUsed = BigInt(memUsedBytes) * timeSpent
val bytesMillisAllocated = BigInt(executorMemoryBytes) * timeSpent
sumResourcesAllocatedForUse += (bytesMillisAllocated / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS)))
sumResourceUsage += (bytesMillisUsed / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS)))
}
)
(sumResourceUsage, sumResourcesAllocatedForUse)
}

private def aggregateresourcesAllocatedForUse(
executorInstances: Int,
executorMemoryBytes: Long,
applicationDurationMillis: Long
): BigInt = {
): BigInt = {
val bytesMillis = BigInt(executorInstances) * BigInt(executorMemoryBytes) * BigInt(applicationDurationMillis)
(bytesMillis / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS)))
}
Expand Down Expand Up @@ -121,9 +127,9 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
object SparkMetricsAggregator {
/** The percentage of allocated memory we expect to waste because of overhead. */
val DEFAULT_ALLOCATED_MEMORY_WASTE_BUFFER_PERCENTAGE = 0.5D

val ALLOCATED_MEMORY_WASTE_BUFFER_PERCENTAGE_KEY = "allocated_memory_waste_buffer_percentage"

val SPARK_RESERVED_MEMORY: String = "300M"
val SPARK_EXECUTOR_INSTANCES_KEY = "spark.executor.instances"
val SPARK_EXECUTOR_MEMORY_KEY = "spark.executor.memory"
val JVM_USED_MEMORY = "jvmUsedMemory"
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ import scala.collection.JavaConverters
import com.linkedin.drelephant.analysis.{ApplicationType, HadoopApplicationData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfo, ExecutorSummary, JobData, StageData}


case class SparkApplicationData(
appId: String,
appConfigurationProperties: Map[String, String],
applicationInfo: ApplicationInfo,
jobDatas: Seq[JobData],
stageDatas: Seq[StageData],
executorSummaries: Seq[ExecutorSummary]
executorSummaries: Seq[ExecutorSummary],
stagesWithFailedTasks: Seq[StageData]
) extends HadoopApplicationData {
import SparkApplicationData._
import JavaConverters._
Expand Down Expand Up @@ -65,6 +65,7 @@ object SparkApplicationData {
val jobDatas = restDerivedData.jobDatas
val stageDatas = restDerivedData.stageDatas
val executorSummaries = restDerivedData.executorSummaries
apply(appId, appConfigurationProperties, applicationInfo, jobDatas, stageDatas, executorSummaries)
val stagesWithFailedTasks = restDerivedData.stagesWithFailedTasks
apply(appId, appConfigurationProperties, applicationInfo, jobDatas, stageDatas, executorSummaries, stagesWithFailedTasks)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ case class SparkRestDerivedData(
jobDatas: Seq[JobData],
stageDatas: Seq[StageData],
executorSummaries: Seq[ExecutorSummary],
stagesWithFailedTasks: Seq[StageData],
private[spark] val logDerivedData: Option[SparkLogDerivedData] = None)
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)
private def doFetchDataUsingRestAndLogClients(analyticJob: AnalyticJob): Future[SparkApplicationData] = Future {
val appId = analyticJob.getAppId
val restDerivedData = Await.result(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest), DEFAULT_TIMEOUT)

val logDerivedData = eventLogSource match {
case EventLogSource.None => None
case EventLogSource.Rest => restDerivedData.logDerivedData
Expand Down Expand Up @@ -146,4 +145,5 @@ object SparkFetcher {
val SPARK_EVENT_LOG_ENABLED_KEY = "spark.eventLog.enabled"
val DEFAULT_TIMEOUT = Duration(5, SECONDS)
val LOG_LOCATION_URI_XML_FIELD = "event_log_location_uri"
val FETCH_FAILED_TASKS = "fetch_failed_tasks"
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class SparkLogClient(hadoopConfiguration: Configuration, sparkConf: SparkConf, e
val (eventLogPath, eventLogCodec) =
sparkUtils.pathAndCodecforEventLog(sparkConf, eventLogFileSystem, baseEventLogPath, appId, attemptId)

Future {
Future {
sparkUtils.withEventLog(eventLogFileSystem, eventLogPath, eventLogCodec)(findDerivedData(_))
}
}
Expand Down
17 changes: 15 additions & 2 deletions app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class SparkRestClient(sparkConf: SparkConf) {

private val apiTarget: WebTarget = client.property(ClientProperties.CONNECT_TIMEOUT, CONNECTION_TIMEOUT).property(ClientProperties.READ_TIMEOUT, READ_TIMEOUT).target(historyServerUri).path(API_V1_MOUNT_PATH)

def fetchData(appId: String, fetchLogs: Boolean = false)(
def fetchData(appId: String, fetchLogs: Boolean = false, fetchFailedTasks: Boolean = true)(
implicit ec: ExecutionContext
): Future[SparkRestDerivedData] = {
val (applicationInfo, attemptTarget) = getApplicationMetaData(appId)
Expand All @@ -101,6 +101,7 @@ class SparkRestClient(sparkConf: SparkConf) {
Await.result(futureJobDatas, DEFAULT_TIMEOUT),
Await.result(futureStageDatas, DEFAULT_TIMEOUT),
Await.result(futureExecutorSummaries, Duration(5, SECONDS)),
Seq.empty,
Await.result(futureLogData, Duration(5, SECONDS))
)

Expand Down Expand Up @@ -223,7 +224,7 @@ class SparkRestClient(sparkConf: SparkConf) {
}

private def getExecutorSummaries(attemptTarget: WebTarget): Seq[ExecutorSummaryImpl] = {
val target = attemptTarget.path("executors")
val target = attemptTarget.path("allexecutors")
try {
get(target, SparkRestObjectMapper.readValue[Seq[ExecutorSummaryImpl]])
} catch {
Expand All @@ -234,6 +235,18 @@ class SparkRestClient(sparkConf: SparkConf) {
}
}
}

private def getStagesWithFailedTasks(attemptTarget: WebTarget): Seq[StageDataImpl] = {
val target = attemptTarget.path("stages/failedTasks")
try {
get(target, SparkRestObjectMapper.readValue[Seq[StageDataImpl]])
} catch {
case NonFatal(e) => {
logger.error(s"error reading failedTasks ${target.getUri}", e)
throw e
}
}
}
}

object SparkRestClient {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,20 @@ trait ExecutorSummary{
def failedTasks: Int
def completedTasks: Int
def totalTasks: Int
def maxTasks: Int
def totalDuration: Long
def addTime: Date
def endTime: Option[Date]
def totalInputBytes: Long
def totalShuffleRead: Long
def totalShuffleWrite: Long
def maxMemory: Long
def totalGCTime: Long
def executorLogs: Map[String, String]}
def totalMemoryBytesSpilled: Long
def executorLogs: Map[String, String]
def peakJvmUsedMemory: Map[String, Long]
def peakUnifiedMemory: Map[String, Long]
}

trait JobData{
def jobId: Int
Expand Down Expand Up @@ -160,7 +167,7 @@ trait StageData{
def schedulingPool: String

def accumulatorUpdates: Seq[AccumulableInfo]
def tasks: Option[Map[Long, TaskData]]
def tasks: Option[Map[Long, TaskDataImpl]]
def executorSummary: Option[Map[String, ExecutorStageSummary]]}

trait TaskData{
Expand Down Expand Up @@ -287,13 +294,19 @@ class ExecutorSummaryImpl(
var failedTasks: Int,
var completedTasks: Int,
var totalTasks: Int,
var maxTasks: Int,
var totalDuration: Long,
var addTime: Date,
var endTime: Option[Date],
var totalInputBytes: Long,
var totalShuffleRead: Long,
var totalShuffleWrite: Long,
var maxMemory: Long,
var totalGCTime: Long,
var executorLogs: Map[String, String]) extends ExecutorSummary
var totalMemoryBytesSpilled: Long,
var executorLogs: Map[String, String],
var peakJvmUsedMemory: Map[String, Long],
var peakUnifiedMemory: Map[String, Long]) extends ExecutorSummary

class JobDataImpl(
var jobId: Int,
Expand Down Expand Up @@ -366,7 +379,7 @@ class StageDataImpl(
var schedulingPool: String,

var accumulatorUpdates: Seq[AccumulableInfoImpl],
var tasks: Option[Map[Long, TaskData]],
var tasks: Option[Map[Long, TaskDataImpl]],
var executorSummary: Option[Map[String, ExecutorStageSummaryImpl]]) extends StageData

class TaskDataImpl(
Expand Down
Loading