Skip to content

Commit bfb3ffe

Browse files
committed
[SPARK-27682][CORE][GRAPHX][MLLIB] Replace use of collections and methods that will be removed in Scala 2.13 with work-alikes
## What changes were proposed in this pull request? This replaces use of collection classes like `MutableList` and `ArrayStack` with workalikes that are available in 2.12, as they will be removed in 2.13. It also removes use of `.to[Collection]` as its uses was superfluous anyway. Removing `collection.breakOut` will have to wait until 2.13 ## How was this patch tested? Existing tests Closes apache#24586 from srowen/SPARK-27682. Authored-by: Sean Owen <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent fd9acf2 commit bfb3ffe

File tree

11 files changed

+71
-60
lines changed

11 files changed

+71
-60
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

+25-25
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger
2424

2525
import scala.annotation.tailrec
2626
import scala.collection.Map
27-
import scala.collection.mutable.{ArrayStack, HashMap, HashSet}
27+
import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
2828
import scala.concurrent.duration._
2929
import scala.util.control.NonFatal
3030

@@ -468,21 +468,21 @@ private[spark] class DAGScheduler(
468468

469469
/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
470470
private def getMissingAncestorShuffleDependencies(
471-
rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = {
472-
val ancestors = new ArrayStack[ShuffleDependency[_, _, _]]
471+
rdd: RDD[_]): ListBuffer[ShuffleDependency[_, _, _]] = {
472+
val ancestors = new ListBuffer[ShuffleDependency[_, _, _]]
473473
val visited = new HashSet[RDD[_]]
474474
// We are manually maintaining a stack here to prevent StackOverflowError
475475
// caused by recursively visiting
476-
val waitingForVisit = new ArrayStack[RDD[_]]
477-
waitingForVisit.push(rdd)
476+
val waitingForVisit = new ListBuffer[RDD[_]]
477+
waitingForVisit += rdd
478478
while (waitingForVisit.nonEmpty) {
479-
val toVisit = waitingForVisit.pop()
479+
val toVisit = waitingForVisit.remove(0)
480480
if (!visited(toVisit)) {
481481
visited += toVisit
482482
getShuffleDependencies(toVisit).foreach { shuffleDep =>
483483
if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
484-
ancestors.push(shuffleDep)
485-
waitingForVisit.push(shuffleDep.rdd)
484+
ancestors.prepend(shuffleDep)
485+
waitingForVisit.prepend(shuffleDep.rdd)
486486
} // Otherwise, the dependency and its ancestors have already been registered.
487487
}
488488
}
@@ -506,17 +506,17 @@ private[spark] class DAGScheduler(
506506
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
507507
val parents = new HashSet[ShuffleDependency[_, _, _]]
508508
val visited = new HashSet[RDD[_]]
509-
val waitingForVisit = new ArrayStack[RDD[_]]
510-
waitingForVisit.push(rdd)
509+
val waitingForVisit = new ListBuffer[RDD[_]]
510+
waitingForVisit += rdd
511511
while (waitingForVisit.nonEmpty) {
512-
val toVisit = waitingForVisit.pop()
512+
val toVisit = waitingForVisit.remove(0)
513513
if (!visited(toVisit)) {
514514
visited += toVisit
515515
toVisit.dependencies.foreach {
516516
case shuffleDep: ShuffleDependency[_, _, _] =>
517517
parents += shuffleDep
518518
case dependency =>
519-
waitingForVisit.push(dependency.rdd)
519+
waitingForVisit.prepend(dependency.rdd)
520520
}
521521
}
522522
}
@@ -529,10 +529,10 @@ private[spark] class DAGScheduler(
529529
*/
530530
private def traverseParentRDDsWithinStage(rdd: RDD[_], predicate: RDD[_] => Boolean): Boolean = {
531531
val visited = new HashSet[RDD[_]]
532-
val waitingForVisit = new ArrayStack[RDD[_]]
533-
waitingForVisit.push(rdd)
532+
val waitingForVisit = new ListBuffer[RDD[_]]
533+
waitingForVisit += rdd
534534
while (waitingForVisit.nonEmpty) {
535-
val toVisit = waitingForVisit.pop()
535+
val toVisit = waitingForVisit.remove(0)
536536
if (!visited(toVisit)) {
537537
if (!predicate(toVisit)) {
538538
return false
@@ -542,7 +542,7 @@ private[spark] class DAGScheduler(
542542
case _: ShuffleDependency[_, _, _] =>
543543
// Not within the same stage with current rdd, do nothing.
544544
case dependency =>
545-
waitingForVisit.push(dependency.rdd)
545+
waitingForVisit.prepend(dependency.rdd)
546546
}
547547
}
548548
}
@@ -554,7 +554,8 @@ private[spark] class DAGScheduler(
554554
val visited = new HashSet[RDD[_]]
555555
// We are manually maintaining a stack here to prevent StackOverflowError
556556
// caused by recursively visiting
557-
val waitingForVisit = new ArrayStack[RDD[_]]
557+
val waitingForVisit = new ListBuffer[RDD[_]]
558+
waitingForVisit += stage.rdd
558559
def visit(rdd: RDD[_]) {
559560
if (!visited(rdd)) {
560561
visited += rdd
@@ -568,15 +569,14 @@ private[spark] class DAGScheduler(
568569
missing += mapStage
569570
}
570571
case narrowDep: NarrowDependency[_] =>
571-
waitingForVisit.push(narrowDep.rdd)
572+
waitingForVisit.prepend(narrowDep.rdd)
572573
}
573574
}
574575
}
575576
}
576577
}
577-
waitingForVisit.push(stage.rdd)
578578
while (waitingForVisit.nonEmpty) {
579-
visit(waitingForVisit.pop())
579+
visit(waitingForVisit.remove(0))
580580
}
581581
missing.toList
582582
}
@@ -2000,7 +2000,8 @@ private[spark] class DAGScheduler(
20002000
val visitedRdds = new HashSet[RDD[_]]
20012001
// We are manually maintaining a stack here to prevent StackOverflowError
20022002
// caused by recursively visiting
2003-
val waitingForVisit = new ArrayStack[RDD[_]]
2003+
val waitingForVisit = new ListBuffer[RDD[_]]
2004+
waitingForVisit += stage.rdd
20042005
def visit(rdd: RDD[_]) {
20052006
if (!visitedRdds(rdd)) {
20062007
visitedRdds += rdd
@@ -2009,17 +2010,16 @@ private[spark] class DAGScheduler(
20092010
case shufDep: ShuffleDependency[_, _, _] =>
20102011
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
20112012
if (!mapStage.isAvailable) {
2012-
waitingForVisit.push(mapStage.rdd)
2013+
waitingForVisit.prepend(mapStage.rdd)
20132014
} // Otherwise there's no need to follow the dependency back
20142015
case narrowDep: NarrowDependency[_] =>
2015-
waitingForVisit.push(narrowDep.rdd)
2016+
waitingForVisit.prepend(narrowDep.rdd)
20162017
}
20172018
}
20182019
}
20192020
}
2020-
waitingForVisit.push(stage.rdd)
20212021
while (waitingForVisit.nonEmpty) {
2022-
visit(waitingForVisit.pop())
2022+
visit(waitingForVisit.remove(0))
20232023
}
20242024
visitedRdds.contains(target.rdd)
20252025
}

core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers {
333333

334334
// ArrayBuffer iterator (indexable type)
335335
d = medianKSD(
336-
gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).to[ArrayBuffer].iterator)),
336+
gaps(sampler.sample(ArrayBuffer(Iterator.from(0).take(20*sampleSize).toSeq: _*).iterator)),
337337
gaps(sample(Iterator.from(0), 0.1)))
338338
d should be < D
339339

@@ -557,7 +557,7 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers {
557557

558558
// ArrayBuffer iterator (indexable type)
559559
d = medianKSD(
560-
gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).to[ArrayBuffer].iterator)),
560+
gaps(sampler.sample(ArrayBuffer(Iterator.from(0).take(20*sampleSize).toSeq: _*).iterator)),
561561
gaps(sampleWR(Iterator.from(0), 0.1)))
562562
d should be < D
563563

graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala

+7-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.graphx.lib
1919

20+
import scala.collection.{mutable, Map}
2021
import scala.reflect.ClassTag
2122

2223
import org.apache.spark.graphx._
@@ -51,11 +52,14 @@ object LabelPropagation {
5152
}
5253
def mergeMessage(count1: Map[VertexId, Long], count2: Map[VertexId, Long])
5354
: Map[VertexId, Long] = {
54-
(count1.keySet ++ count2.keySet).map { i =>
55+
// Mimics the optimization of breakOut, not present in Scala 2.13, while working in 2.12
56+
val map = mutable.Map[VertexId, Long]()
57+
(count1.keySet ++ count2.keySet).foreach { i =>
5558
val count1Val = count1.getOrElse(i, 0L)
5659
val count2Val = count2.getOrElse(i, 0L)
57-
i -> (count1Val + count2Val)
58-
}(collection.breakOut)
60+
map.put(i, count1Val + count2Val)
61+
}
62+
map
5963
}
6064
def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, Long]): VertexId = {
6165
if (message.isEmpty) attr else message.maxBy(_._2)._1

graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala

+7-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.graphx.lib
1919

20+
import scala.collection.{mutable, Map}
2021
import scala.reflect.ClassTag
2122

2223
import org.apache.spark.graphx._
@@ -34,9 +35,12 @@ object ShortestPaths extends Serializable {
3435
private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) }
3536

3637
private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = {
37-
(spmap1.keySet ++ spmap2.keySet).map {
38-
k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
39-
}(collection.breakOut)
38+
// Mimics the optimization of breakOut, not present in Scala 2.13, while working in 2.12
39+
val map = mutable.Map[VertexId, Int]()
40+
(spmap1.keySet ++ spmap2.keySet).foreach { k =>
41+
map.put(k, math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue)))
42+
}
43+
map
4044
}
4145

4246
/**

graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.graphx.util.GraphGenerators
2424

2525
object GridPageRank {
2626
def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double): Seq[(VertexId, Double)] = {
27-
val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int])
27+
val inNbrs = Array.fill(nRows * nCols)(collection.mutable.ArrayBuffer.empty[Int])
2828
val outDegree = Array.fill(nRows * nCols)(0)
2929
// Convert row column address into vertex ids (row major order)
3030
def sub2ind(r: Int, c: Int): Int = r * nCols + c

mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala

+12-10
Original file line numberDiff line numberDiff line change
@@ -194,14 +194,16 @@ private[spark] object RandomForest extends Logging with Serializable {
194194
training the same tree in the next iteration. This focus allows us to send fewer trees to
195195
workers on each iteration; see topNodesForGroup below.
196196
*/
197-
val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
197+
val nodeStack = new mutable.ListBuffer[(Int, LearningNode)]
198198

199199
val rng = new Random()
200200
rng.setSeed(seed)
201201

202202
// Allocate and queue root nodes.
203203
val topNodes = Array.fill[LearningNode](numTrees)(LearningNode.emptyNode(nodeIndex = 1))
204-
Range(0, numTrees).foreach(treeIndex => nodeStack.push((treeIndex, topNodes(treeIndex))))
204+
for (treeIndex <- 0 until numTrees) {
205+
nodeStack.prepend((treeIndex, topNodes(treeIndex)))
206+
}
205207

206208
timer.stop("init")
207209

@@ -398,7 +400,7 @@ private[spark] object RandomForest extends Logging with Serializable {
398400
nodesForGroup: Map[Int, Array[LearningNode]],
399401
treeToNodeToIndexInfo: Map[Int, Map[Int, NodeIndexInfo]],
400402
splits: Array[Array[Split]],
401-
nodeStack: mutable.ArrayStack[(Int, LearningNode)],
403+
nodeStack: mutable.ListBuffer[(Int, LearningNode)],
402404
timer: TimeTracker = new TimeTracker,
403405
nodeIdCache: Option[NodeIdCache] = None): Unit = {
404406

@@ -639,10 +641,10 @@ private[spark] object RandomForest extends Logging with Serializable {
639641

640642
// enqueue left child and right child if they are not leaves
641643
if (!leftChildIsLeaf) {
642-
nodeStack.push((treeIndex, node.leftChild.get))
644+
nodeStack.prepend((treeIndex, node.leftChild.get))
643645
}
644646
if (!rightChildIsLeaf) {
645-
nodeStack.push((treeIndex, node.rightChild.get))
647+
nodeStack.prepend((treeIndex, node.rightChild.get))
646648
}
647649

648650
logDebug("leftChildIndex = " + node.leftChild.get.id +
@@ -1042,8 +1044,8 @@ private[spark] object RandomForest extends Logging with Serializable {
10421044
var partNumSamples = 0.0
10431045
var unweightedNumSamples = 0.0
10441046
featureSamples.foreach { case (sampleWeight, feature) =>
1045-
partValueCountMap(feature) = partValueCountMap.getOrElse(feature, 0.0) + sampleWeight;
1046-
partNumSamples += sampleWeight;
1047+
partValueCountMap(feature) = partValueCountMap.getOrElse(feature, 0.0) + sampleWeight
1048+
partNumSamples += sampleWeight
10471049
unweightedNumSamples += 1.0
10481050
}
10491051

@@ -1131,7 +1133,7 @@ private[spark] object RandomForest extends Logging with Serializable {
11311133
* The feature indices are None if not subsampling features.
11321134
*/
11331135
private[tree] def selectNodesToSplit(
1134-
nodeStack: mutable.ArrayStack[(Int, LearningNode)],
1136+
nodeStack: mutable.ListBuffer[(Int, LearningNode)],
11351137
maxMemoryUsage: Long,
11361138
metadata: DecisionTreeMetadata,
11371139
rng: Random): (Map[Int, Array[LearningNode]], Map[Int, Map[Int, NodeIndexInfo]]) = {
@@ -1146,7 +1148,7 @@ private[spark] object RandomForest extends Logging with Serializable {
11461148
// so we allow one iteration if memUsage == 0.
11471149
var groupDone = false
11481150
while (nodeStack.nonEmpty && !groupDone) {
1149-
val (treeIndex, node) = nodeStack.top
1151+
val (treeIndex, node) = nodeStack.head
11501152
// Choose subset of features for node (if subsampling).
11511153
val featureSubset: Option[Array[Int]] = if (metadata.subsamplingFeatures) {
11521154
Some(SamplingUtils.reservoirSampleAndCount(Range(0,
@@ -1157,7 +1159,7 @@ private[spark] object RandomForest extends Logging with Serializable {
11571159
// Check if enough memory remains to add this node to the group.
11581160
val nodeMemUsage = RandomForest.aggregateSizeForNode(metadata, featureSubset) * 8L
11591161
if (memUsage + nodeMemUsage <= maxMemoryUsage || memUsage == 0) {
1160-
nodeStack.pop()
1162+
nodeStack.remove(0)
11611163
mutableNodesForGroup.getOrElseUpdate(treeIndex, new mutable.ArrayBuffer[LearningNode]()) +=
11621164
node
11631165
mutableTreeToNodeToIndexInfo

mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala

+4-6
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext {
4040

4141
import RandomForestSuite.mapToVec
4242

43-
private val seed = 42
44-
4543
/////////////////////////////////////////////////////////////////////////////
4644
// Tests for split calculation
4745
/////////////////////////////////////////////////////////////////////////////
@@ -350,7 +348,7 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext {
350348
val treeToNodeToIndexInfo = Map(0 -> Map(
351349
topNode.id -> new RandomForest.NodeIndexInfo(0, None)
352350
))
353-
val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
351+
val nodeStack = new mutable.ListBuffer[(Int, LearningNode)]
354352
RandomForest.findBestSplits(baggedInput, metadata, Map(0 -> topNode),
355353
nodesForGroup, treeToNodeToIndexInfo, splits, nodeStack)
356354

@@ -392,7 +390,7 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext {
392390
val treeToNodeToIndexInfo = Map(0 -> Map(
393391
topNode.id -> new RandomForest.NodeIndexInfo(0, None)
394392
))
395-
val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
393+
val nodeStack = new mutable.ListBuffer[(Int, LearningNode)]
396394
RandomForest.findBestSplits(baggedInput, metadata, Map(0 -> topNode),
397395
nodesForGroup, treeToNodeToIndexInfo, splits, nodeStack)
398396

@@ -505,11 +503,11 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext {
505503
val failString = s"Failed on test with:" +
506504
s"numTrees=$numTrees, featureSubsetStrategy=$featureSubsetStrategy," +
507505
s" numFeaturesPerNode=$numFeaturesPerNode, seed=$seed"
508-
val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
506+
val nodeStack = new mutable.ListBuffer[(Int, LearningNode)]
509507
val topNodes: Array[LearningNode] = new Array[LearningNode](numTrees)
510508
Range(0, numTrees).foreach { treeIndex =>
511509
topNodes(treeIndex) = LearningNode.emptyNode(nodeIndex = 1)
512-
nodeStack.push((treeIndex, topNodes(treeIndex)))
510+
nodeStack.prepend((treeIndex, topNodes(treeIndex)))
513511
}
514512
val rng = new scala.util.Random(seed = seed)
515513
val (nodesForGroup: Map[Int, Array[LearningNode]],

mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ class PCASuite extends SparkFunSuite with MLlibTestSparkContext {
5757

5858
test("number of features more than 65535") {
5959
val data1 = sc.parallelize(Array(
60-
Vectors.dense((1 to 100000).map(_ => 2.0).to[scala.Vector].toArray),
61-
Vectors.dense((1 to 100000).map(_ => 0.0).to[scala.Vector].toArray)
60+
Vectors.dense(Array.fill(100000)(2.0)),
61+
Vectors.dense(Array.fill(100000)(0.0))
6262
), 2)
6363

6464
val pca = new PCA(2).fit(data1)

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ private[spark] object SparkAppLauncher extends Logging {
125125
appConf.toStringArray :+ appArguments.mainAppResource
126126

127127
if (appArguments.appArgs.nonEmpty) {
128-
commandLine ++= appArguments.appArgs.to[ArrayBuffer]
128+
commandLine ++= appArguments.appArgs
129129
}
130130
logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}")
131131
ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class EquivalentExpressions {
4141
}
4242

4343
// For each expression, the set of equivalent expressions.
44-
private val equivalenceMap = mutable.HashMap.empty[Expr, mutable.MutableList[Expression]]
44+
private val equivalenceMap = mutable.HashMap.empty[Expr, mutable.ArrayBuffer[Expression]]
4545

4646
/**
4747
* Adds each expression to this data structure, grouping them with existing equivalent
@@ -56,7 +56,7 @@ class EquivalentExpressions {
5656
f.get += expr
5757
true
5858
} else {
59-
equivalenceMap.put(e, mutable.MutableList(expr))
59+
equivalenceMap.put(e, mutable.ArrayBuffer(expr))
6060
false
6161
}
6262
} else {
@@ -102,7 +102,7 @@ class EquivalentExpressions {
102102
* an empty collection if there are none.
103103
*/
104104
def getEquivalentExprs(e: Expression): Seq[Expression] = {
105-
equivalenceMap.getOrElse(Expr(e), mutable.MutableList())
105+
equivalenceMap.getOrElse(Expr(e), Seq.empty)
106106
}
107107

108108
/**

0 commit comments

Comments
 (0)