-
Notifications
You must be signed in to change notification settings - Fork 857
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
LIHADOOP-39635: Add new configuration parameters heuristic #463
Conversation
#438 have my comments ,which are already resolved in this pull request |
// check if executor memory can be lowered | ||
adjustExecutorMemory() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whats the rational behind calling first adjustParametersForLongTasks, adjustParametersForExecutionMemorySpill, adjustParametersForGCandOOM . Is there a priority defined to fix long running task and then memory spill
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ordering is somewhat arbitrary, but there is a better idea of how much to increase the number of partitions for long tasks, and a reasonable estimate for how much to adjust memory/cores/partitions for execution memory spill. GC and OOM are just a guess. It is going from more exact to less exact.
The adjustment(s) for each could affect the other conditions as well (if more partitions are specified due to long tasks, then this would also help with execution memory spill, and OOM/GC).
* If so, either increase cores to make better use of the memory, or decrease executor | ||
* memory. | ||
*/ | ||
private def adjustExecutorMemory() = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decreasing executor memory may cause more spill , how are we handling the same scenario
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there's execution spill, then there shouldn't be too memory allocated. However, it makes sense to be cautious. I'll add a check for execution spill as well, and not adjust the memory in this case.
@edwinalu How are you planning to integrate with Unified Architecture / TuneIn |
@pralabhkumar , yes, I am not sure what would be the best way to integrate with TuneIn. When we discussed a few weeks ago, the idea was to merge the heuristic first. Let's discuss at the meeting. It would be good to keep in sync. |
while (iter.hasNext && continueFn(modified)) { | ||
iter.next() match { | ||
case adjustment: CoreDivisorAdjustment => | ||
if (adjustment.canAdjust(recommendedExecutorCores)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If code for these two cases are similar, then one of them can be removed. Same is true for other cases as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is processing the adjustment differently for each adjustment type. Are you suggesting creating more case classes, CoreAdjustment, MemoryAdjustment, and PartitionAdjustment, and then subclassing the current case classes off those, to consolidate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only related to merging two cases if code is same for these 2 cases.
For example
Case 1:
//Code
Case 2:
//Code
Case 1 | 2 :
//Code:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added traits for CoreAdjustment and MemoryAdjustment, and consolidated the case classes.
|
||
val currentParallelism = sparkExecutorInstances.map(_ * sparkExecutorCores) | ||
|
||
val jvmUsedMemoryHeuristic = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the long run, we need to think about caching these heuristics, so that these need not be computed multiple times. For now this is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
* @return the recommended value in bytes for executor memory overhead | ||
*/ | ||
private def calculateExecutorMemoryOverhead(): Option[Long] = { | ||
val overheadMemoryIncrement = 1L * GB_TO_BYTES |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use FileUtils.ONE_GB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced.
if (stageAnalysis.exists { stage => | ||
hasSignificantSeverity(stage.taskFailureResult.containerKilledSeverity) | ||
}) { | ||
val actualMemoryOverhead = sparkExecutorMemoryOverhead.getOrElse { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we consider user specified memory overhead as well ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is first trying to get the user specified memory overhead, and if this doesn't exist, calculating the default value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed that.
num * MB_TO_BYTES | ||
} else { | ||
unit.charAt(0) match { | ||
case 'T' => num * TB_TO_BYTES |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use FileUtils.ONE_GB, ONE_MB etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced.
* @param size The memory value in long bytes | ||
* @return The formatted string, null if | ||
*/ | ||
private def bytesToString(size: Long): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is available in org.apache.spark.network.util.JavaUtils.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've replaced JstringToBytes with avaUtils.byteStringAsBytes. I wasn't able to find a function for the other direction.
|
||
import com.linkedin.drelephant.analysis.SeverityThresholds | ||
|
||
object ConfigurationUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to change the name to have ConfigurationHeuristicsConstants or something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed.
LGTM , please test the flow on pokemon/EI cluster to make sure things are working fine e2e |
app/com/linkedin/drelephant/spark/heuristics/ConfigurationParameterAdjustment.scala
Show resolved
Hide resolved
…ves. Do not print stack trace for fetching failed tasks.
app/com/linkedin/drelephant/spark/heuristics/ConfigurationParametersHeuristic.scala
Show resolved
Hide resolved
* @param size The memory value in long bytes | ||
* @return The formatted string, null if | ||
*/ | ||
private def bytesToString(size: Long): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MemoryFormatUtils#bytesToString does something similar. It won't print bytes in MB if its < 2GB but otherwise its exactly same. Maybe we can modify MemoryFormatUtils#bytesToString if this condition is necessary or move this method to MemoryFormatUtils too (As this method is good candidate for utils)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other differences are that MemoryFormatUtils#bytesToString also have a space between the value and unit, so "2 GB" instead of "2GB", and it only does GB and MB (no KB, or B) and rounds up. It would be possible to modify MemoryFormatUtils#bytesToString to have additional parameters for specify the threshold for moving to the next unit, which units to use, if it should round up, and if it should add a space or not.
|
||
/** | ||
* This class contains Spark stage information. | ||
*/public class SparkStageData { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this class meant for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I'm not sure how this got added -- it is not being used. It may have somehow been a merge conflict when copying over from another branch. I'll remove.
val (numTasksWithContainerKilled, containerKilledSeverity) = | ||
checkForSpecificTaskError(stageId, stageData, failedTasks, | ||
StagesWithFailedTasksHeuristic.OVERHEAD_MEMORY_ERROR, | ||
"the container was killed by YARN for exeeding memory limits.", details) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this message correct? For instance, exeeding should ideally be "exceeding". Is the message generated in YarnAllocator in Spark code? If yes, the message may well be incorrect.
Moreover, going ahead, probably Spark can do error categorization by itself and pass a well-defined enum instead of Dr.Elephant expecting a custom message generated in Spark code because such code can break. Error message can still be passed as usual and printed to give user detailed information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this should be "exceeded". For searching the actual error message, it is using StagesWithFailedTasksHeuristic.OVERHEAD_MEMORY_ERROR.
Right now, Spark is returning the error message, which can be varied, if it is coming from the user application. There isn't a well-defined enum for types of errors.
data.appConfigurationProperties | ||
|
||
// current configuration parameters | ||
lazy val sparkExecutorMemory = JavaUtils.byteStringAsBytes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we use MemoryFormatUtils#stringToBytes which is in Dr.Elephant codebase instead of using Spark utils. It will do exactly the same thing. Utils are typically meant to be used within a project even though they are public classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this does seem to be the same, and I will change.
@@ -211,7 +218,7 @@ class SparkRestClient(sparkConf: SparkConf) { | |||
} | |||
|
|||
private def getStageDatas(attemptTarget: WebTarget): Seq[StageDataImpl] = { | |||
val target = attemptTarget.path("stages") | |||
val target = attemptTarget.path("stages/withSummaries") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is LinkedIn specific REST endpoint and wont work in open source till it's contributed back to Spark upstream. Probably going ahead we should refactor the code and have our own SparkRestClient implementation. The abstraction for us is primarily at the fetcher level. So probably have a linkedin specific spark fetcher implementation which extends SparkFetcher which currently exists, reuses the part where we are fetching event logs but has custom Spark rest client implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is LinkedIn specific, and separating out the code would make sense. Could the refactoring be done later?
* Add new configuration parameters heuristic * add configuration * check for execution memory spill before adjusting executor memory * code review comments * remove partitions * consolidate case classes * add license * add more licenses * remove stage level GC analysis/warnings, due to too many false positives. Do not print stack trace for fetching failed tasks. * code review comments (cherry picked from commit 07c2446)
Add new configuration parameters heuristic, which will list the current values for configuration parameters, and also recommended new values. To determine new values, it will check for:
execution memory spill: this will slow down the application, so try to prevent this by increasing partitions, increasing executor memory, or decreasing cores.
long tasks: this will slow down the application, so try to prevent this by increasing the number of partitions.
task skew: this will slow down the application, so add recommendations for making partitions more even.
OOM or GC: increase memory, increase partitions, or decrease cores, to try to avoid the error.
container killed by YARN errors: increase overhead memory.
unused executor memory, if this is much higher than max JVM used memory; either increase cores or decrease memory.
driver configuration parameters (memory and cores).