Skip to content

Commit 38263f6

Browse files
cxzl25squito
authored andcommitted
[SPARK-27630][CORE] Properly handle task end events from completed stages
## What changes were proposed in this pull request? Track tasks separately for each stage attempt (instead of tracking by stage), and do NOT reset the numRunningTasks to 0 on StageCompleted. In the case of stage retry, the `taskEnd` event from the zombie stage sometimes makes the number of `totalRunningTasks` negative, which will causes the job to get stuck. Similar problem also exists with `stageIdToTaskIndices` & `stageIdToSpeculativeTaskIndices`. If it is a failed `taskEnd` event of the zombie stage, this will cause `stageIdToTaskIndices` or `stageIdToSpeculativeTaskIndices` to remove the task index of the active stage, and the number of `totalPendingTasks` will increase unexpectedly. ## How was this patch tested? unit test properly handle task end events from completed stages Closes apache#24497 from cxzl25/fix_stuck_job_follow_up. Authored-by: sychen <[email protected]> Signed-off-by: Imran Rashid <[email protected]>
1 parent b71c130 commit 38263f6

File tree

5 files changed

+104
-55
lines changed

5 files changed

+104
-55
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

+65-48
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,10 @@ private[spark] class ExecutorAllocationManager(
491491
numExecutorsToAdd = 1
492492
}
493493

494+
private case class StageAttempt(stageId: Int, stageAttemptId: Int) {
495+
override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)"
496+
}
497+
494498
/**
495499
* A listener that notifies the given allocation manager of when to add and remove executors.
496500
*
@@ -499,29 +503,32 @@ private[spark] class ExecutorAllocationManager(
499503
*/
500504
private[spark] class ExecutorAllocationListener extends SparkListener {
501505

502-
private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
503-
// Number of running tasks per stage including speculative tasks.
506+
private val stageAttemptToNumTasks = new mutable.HashMap[StageAttempt, Int]
507+
// Number of running tasks per stageAttempt including speculative tasks.
504508
// Should be 0 when no stages are active.
505-
private val stageIdToNumRunningTask = new mutable.HashMap[Int, Int]
506-
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
507-
// Number of speculative tasks to be scheduled in each stage
508-
private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int]
509-
// The speculative tasks started in each stage
510-
private val stageIdToSpeculativeTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
511-
512-
// stageId to tuple (the number of task with locality preferences, a map where each pair is a
513-
// node and the number of tasks that would like to be scheduled on that node) map,
514-
// maintain the executor placement hints for each stage Id used by resource framework to better
515-
// place the executors.
516-
private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])]
509+
private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
510+
private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
511+
// Number of speculative tasks to be scheduled in each stageAttempt
512+
private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
513+
// The speculative tasks started in each stageAttempt
514+
private val stageAttemptToSpeculativeTaskIndices =
515+
new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
516+
517+
// stageAttempt to tuple (the number of task with locality preferences, a map where each pair
518+
// is a node and the number of tasks that would like to be scheduled on that node) map,
519+
// maintain the executor placement hints for each stageAttempt used by resource framework
520+
// to better place the executors.
521+
private val stageAttemptToExecutorPlacementHints =
522+
new mutable.HashMap[StageAttempt, (Int, Map[String, Int])]
517523

518524
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
519525
initializing = false
520526
val stageId = stageSubmitted.stageInfo.stageId
527+
val stageAttemptId = stageSubmitted.stageInfo.attemptNumber()
528+
val stageAttempt = StageAttempt(stageId, stageAttemptId)
521529
val numTasks = stageSubmitted.stageInfo.numTasks
522530
allocationManager.synchronized {
523-
stageIdToNumTasks(stageId) = numTasks
524-
stageIdToNumRunningTask(stageId) = 0
531+
stageAttemptToNumTasks(stageAttempt) = numTasks
525532
allocationManager.onSchedulerBacklogged()
526533

527534
// Compute the number of tasks requested by the stage on each host
@@ -536,7 +543,7 @@ private[spark] class ExecutorAllocationManager(
536543
}
537544
}
538545
}
539-
stageIdToExecutorPlacementHints.put(stageId,
546+
stageAttemptToExecutorPlacementHints.put(stageAttempt,
540547
(numTasksPending, hostToLocalTaskCountPerStage.toMap))
541548

542549
// Update the executor placement hints
@@ -546,40 +553,44 @@ private[spark] class ExecutorAllocationManager(
546553

547554
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
548555
val stageId = stageCompleted.stageInfo.stageId
556+
val stageAttemptId = stageCompleted.stageInfo.attemptNumber()
557+
val stageAttempt = StageAttempt(stageId, stageAttemptId)
549558
allocationManager.synchronized {
550-
stageIdToNumTasks -= stageId
551-
stageIdToNumRunningTask -= stageId
552-
stageIdToNumSpeculativeTasks -= stageId
553-
stageIdToTaskIndices -= stageId
554-
stageIdToSpeculativeTaskIndices -= stageId
555-
stageIdToExecutorPlacementHints -= stageId
559+
// do NOT remove stageAttempt from stageAttemptToNumRunningTasks,
560+
// because the attempt may still have running tasks,
561+
// even after another attempt for the stage is submitted.
562+
stageAttemptToNumTasks -= stageAttempt
563+
stageAttemptToNumSpeculativeTasks -= stageAttempt
564+
stageAttemptToTaskIndices -= stageAttempt
565+
stageAttemptToSpeculativeTaskIndices -= stageAttempt
566+
stageAttemptToExecutorPlacementHints -= stageAttempt
556567

557568
// Update the executor placement hints
558569
updateExecutorPlacementHints()
559570

560571
// If this is the last stage with pending tasks, mark the scheduler queue as empty
561572
// This is needed in case the stage is aborted for any reason
562-
if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) {
573+
if (stageAttemptToNumTasks.isEmpty && stageAttemptToNumSpeculativeTasks.isEmpty) {
563574
allocationManager.onSchedulerQueueEmpty()
564575
}
565576
}
566577
}
567578

568579
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
569580
val stageId = taskStart.stageId
581+
val stageAttemptId = taskStart.stageAttemptId
582+
val stageAttempt = StageAttempt(stageId, stageAttemptId)
570583
val taskIndex = taskStart.taskInfo.index
571-
572584
allocationManager.synchronized {
573-
if (stageIdToNumRunningTask.contains(stageId)) {
574-
stageIdToNumRunningTask(stageId) += 1
575-
}
576-
585+
stageAttemptToNumRunningTask(stageAttempt) =
586+
stageAttemptToNumRunningTask.getOrElse(stageAttempt, 0) + 1
577587
// If this is the last pending task, mark the scheduler queue as empty
578588
if (taskStart.taskInfo.speculative) {
579-
stageIdToSpeculativeTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) +=
580-
taskIndex
589+
stageAttemptToSpeculativeTaskIndices.getOrElseUpdate(stageAttempt,
590+
new mutable.HashSet[Int]) += taskIndex
581591
} else {
582-
stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
592+
stageAttemptToTaskIndices.getOrElseUpdate(stageAttempt,
593+
new mutable.HashSet[Int]) += taskIndex
583594
}
584595
if (totalPendingTasks() == 0) {
585596
allocationManager.onSchedulerQueueEmpty()
@@ -588,13 +599,17 @@ private[spark] class ExecutorAllocationManager(
588599
}
589600

590601
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
591-
val taskIndex = taskEnd.taskInfo.index
592602
val stageId = taskEnd.stageId
603+
val stageAttemptId = taskEnd.stageAttemptId
604+
val stageAttempt = StageAttempt(stageId, stageAttemptId)
605+
val taskIndex = taskEnd.taskInfo.index
593606
allocationManager.synchronized {
594-
if (stageIdToNumRunningTask.contains(stageId)) {
595-
stageIdToNumRunningTask(stageId) -= 1
607+
if (stageAttemptToNumRunningTask.contains(stageAttempt)) {
608+
stageAttemptToNumRunningTask(stageAttempt) -= 1
609+
if (stageAttemptToNumRunningTask(stageAttempt) == 0) {
610+
stageAttemptToNumRunningTask -= stageAttempt
611+
}
596612
}
597-
598613
// If the task failed, we expect it to be resubmitted later. To ensure we have
599614
// enough resources to run the resubmitted task, we need to mark the scheduler
600615
// as backlogged again if it's not already marked as such (SPARK-8366)
@@ -603,21 +618,22 @@ private[spark] class ExecutorAllocationManager(
603618
allocationManager.onSchedulerBacklogged()
604619
}
605620
if (taskEnd.taskInfo.speculative) {
606-
stageIdToSpeculativeTaskIndices.get(stageId).foreach {_.remove(taskIndex)}
621+
stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach {_.remove(taskIndex)}
607622
} else {
608-
stageIdToTaskIndices.get(stageId).foreach {_.remove(taskIndex)}
623+
stageAttemptToTaskIndices.get(stageAttempt).foreach {_.remove(taskIndex)}
609624
}
610625
}
611626
}
612627
}
613628

614629
override def onSpeculativeTaskSubmitted(speculativeTask: SparkListenerSpeculativeTaskSubmitted)
615630
: Unit = {
616-
val stageId = speculativeTask.stageId
617-
631+
val stageId = speculativeTask.stageId
632+
val stageAttemptId = speculativeTask.stageAttemptId
633+
val stageAttempt = StageAttempt(stageId, stageAttemptId)
618634
allocationManager.synchronized {
619-
stageIdToNumSpeculativeTasks(stageId) =
620-
stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
635+
stageAttemptToNumSpeculativeTasks(stageAttempt) =
636+
stageAttemptToNumSpeculativeTasks.getOrElse(stageAttempt, 0) + 1
621637
allocationManager.onSchedulerBacklogged()
622638
}
623639
}
@@ -629,14 +645,14 @@ private[spark] class ExecutorAllocationManager(
629645
* Note: This is not thread-safe without the caller owning the `allocationManager` lock.
630646
*/
631647
def pendingTasks(): Int = {
632-
stageIdToNumTasks.map { case (stageId, numTasks) =>
633-
numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0)
648+
stageAttemptToNumTasks.map { case (stageAttempt, numTasks) =>
649+
numTasks - stageAttemptToTaskIndices.get(stageAttempt).map(_.size).getOrElse(0)
634650
}.sum
635651
}
636652

637653
def pendingSpeculativeTasks(): Int = {
638-
stageIdToNumSpeculativeTasks.map { case (stageId, numTasks) =>
639-
numTasks - stageIdToSpeculativeTaskIndices.get(stageId).map(_.size).getOrElse(0)
654+
stageAttemptToNumSpeculativeTasks.map { case (stageAttempt, numTasks) =>
655+
numTasks - stageAttemptToSpeculativeTaskIndices.get(stageAttempt).map(_.size).getOrElse(0)
640656
}.sum
641657
}
642658

@@ -646,9 +662,10 @@ private[spark] class ExecutorAllocationManager(
646662

647663
/**
648664
* The number of tasks currently running across all stages.
665+
* Include running-but-zombie stage attempts
649666
*/
650667
def totalRunningTasks(): Int = {
651-
stageIdToNumRunningTask.values.sum
668+
stageAttemptToNumRunningTask.values.sum
652669
}
653670

654671
/**
@@ -662,7 +679,7 @@ private[spark] class ExecutorAllocationManager(
662679
def updateExecutorPlacementHints(): Unit = {
663680
var localityAwareTasks = 0
664681
val localityToCount = new mutable.HashMap[String, Int]()
665-
stageIdToExecutorPlacementHints.values.foreach { case (numTasksPending, localities) =>
682+
stageAttemptToExecutorPlacementHints.values.foreach { case (numTasksPending, localities) =>
666683
localityAwareTasks += numTasksPending
667684
localities.foreach { case (hostname, count) =>
668685
val updatedCount = localityToCount.getOrElse(hostname, 0) + count

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -933,7 +933,7 @@ private[spark] class DAGScheduler(
933933
}
934934

935935
private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = {
936-
listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId))
936+
listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId))
937937
}
938938

939939
private[scheduler] def handleTaskSetFailed(

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

+4-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@ case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: T
5252
case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent
5353

5454
@DeveloperApi
55-
case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends SparkListenerEvent
55+
case class SparkListenerSpeculativeTaskSubmitted(
56+
stageId: Int,
57+
stageAttemptId: Int = 0)
58+
extends SparkListenerEvent
5659

5760
@DeveloperApi
5861
case class SparkListenerTaskEnd(

core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala

+28-5
Original file line numberDiff line numberDiff line change
@@ -254,25 +254,47 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
254254
assert(numExecutorsToAdd(manager) === 1)
255255

256256
// Verify that running a speculative task doesn't affect the target
257-
post(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-2", true)))
257+
post(SparkListenerTaskStart(1, 0, createTaskInfo(1, 0, "executor-2", true)))
258258
assert(numExecutorsTarget(manager) === 5)
259259
assert(addExecutors(manager) === 0)
260260
assert(numExecutorsToAdd(manager) === 1)
261261
}
262262

263-
test("ignore task end events from completed stages") {
263+
test("properly handle task end events from completed stages") {
264264
val manager = createManager(createConf(0, 10, 0))
265+
266+
// We simulate having a stage fail, but with tasks still running. Then another attempt for
267+
// that stage is started, and we get task completions from the first stage attempt. Make sure
268+
// the value of `totalTasksRunning` is consistent as tasks finish from both attempts (we count
269+
// all running tasks, from the zombie & non-zombie attempts)
265270
val stage = createStageInfo(0, 5)
266271
post(SparkListenerStageSubmitted(stage))
267272
val taskInfo1 = createTaskInfo(0, 0, "executor-1")
268273
val taskInfo2 = createTaskInfo(1, 1, "executor-1")
269274
post(SparkListenerTaskStart(0, 0, taskInfo1))
270275
post(SparkListenerTaskStart(0, 0, taskInfo2))
271276

277+
// The tasks in the zombie attempt haven't completed yet, so we still count them
272278
post(SparkListenerStageCompleted(stage))
273279

280+
// There are still two tasks that belong to the zombie stage running.
281+
assert(totalRunningTasks(manager) === 2)
282+
283+
// submit another attempt for the stage. We count completions from the first zombie attempt
284+
val stageAttempt1 = createStageInfo(stage.stageId, 5, attemptId = 1)
285+
post(SparkListenerStageSubmitted(stageAttempt1))
274286
post(SparkListenerTaskEnd(0, 0, null, Success, taskInfo1, null))
275-
post(SparkListenerTaskEnd(2, 0, null, Success, taskInfo2, null))
287+
assert(totalRunningTasks(manager) === 1)
288+
val attemptTaskInfo1 = createTaskInfo(3, 0, "executor-1")
289+
val attemptTaskInfo2 = createTaskInfo(4, 1, "executor-1")
290+
post(SparkListenerTaskStart(0, 1, attemptTaskInfo1))
291+
post(SparkListenerTaskStart(0, 1, attemptTaskInfo2))
292+
assert(totalRunningTasks(manager) === 3)
293+
post(SparkListenerTaskEnd(0, 1, null, Success, attemptTaskInfo1, null))
294+
assert(totalRunningTasks(manager) === 2)
295+
post(SparkListenerTaskEnd(0, 0, null, Success, taskInfo2, null))
296+
assert(totalRunningTasks(manager) === 1)
297+
post(SparkListenerTaskEnd(0, 1, null, Success, attemptTaskInfo2, null))
276298
assert(totalRunningTasks(manager) === 0)
277299
}
278300

@@ -1033,9 +1055,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
10331055
private def createStageInfo(
10341056
stageId: Int,
10351057
numTasks: Int,
1036-
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
1058+
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty,
1059+
attemptId: Int = 0
10371060
): StageInfo = {
1038-
new StageInfo(stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details",
1061+
new StageInfo(stageId, attemptId, "name", numTasks, Seq.empty, Seq.empty, "no details",
10391062
taskLocalityPreferences = taskLocalityPreferences)
10401063
}
10411064

project/MimaExcludes.scala

+6
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ object MimaExcludes {
7979
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.this"),
8080
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart$"),
8181

82+
// [SPARK-27630][CORE] Properly handle task end events from completed stages
83+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerSpeculativeTaskSubmitted.apply"),
84+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerSpeculativeTaskSubmitted.copy"),
85+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerSpeculativeTaskSubmitted.this"),
86+
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerSpeculativeTaskSubmitted$"),
87+
8288
// [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor
8389
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.SparkTransportConf.fromSparkConf"),
8490

0 commit comments

Comments
 (0)