Skip to content

Commit 35952cb

Browse files
wenxuanguandongjoon-hyun
authored andcommitted
[SPARK-27859][SS] Use efficient sorting instead of .sorted.reverse sequence
## What changes were proposed in this pull request? descending sort in HDFSMetadataLog.getLatest instead of two action of ascending sort and reverse ## How was this patch tested? Jenkins Closes apache#24711 from wenxuanguan/bug-fix-hdfsmetadatalog. Authored-by: wenxuanguan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 826ee60 commit 35952cb

File tree

3 files changed

+3
-5
lines changed

3 files changed

+3
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
8585
val compactibleBatchIds = fileManager.list(metadataPath, batchFilesFilter)
8686
.filter(f => f.getPath.toString.endsWith(CompactibleFileStreamLog.COMPACT_FILE_SUFFIX))
8787
.map(f => pathToBatchId(f.getPath))
88-
.sorted
89-
.reverse
88+
.sorted(Ordering.Long.reverse)
9089

9190
// Case 1
9291
var interval = defaultCompactInterval

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
177177
override def getLatest(): Option[(Long, T)] = {
178178
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
179179
.map(f => pathToBatchId(f.getPath))
180-
.sorted
181-
.reverse
180+
.sorted(Ordering.Long.reverse)
182181
for (batchId <- batchIds) {
183182
val batch = get(batchId)
184183
if (batch.isDefined) {

streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
104104
}
105105
}
106106
val lastFailureReason =
107-
sparkJob.stageIds.sorted.reverse.flatMap(getStageData).
107+
sparkJob.stageIds.sorted(Ordering.Int.reverse).flatMap(getStageData).
108108
dropWhile(_.failureReason == None).take(1). // get the first info that contains failure
109109
flatMap(info => info.failureReason).headOption.getOrElse("")
110110
val formattedDuration = duration.map(d => SparkUIUtils.formatDuration(d)).getOrElse("-")

0 commit comments

Comments
 (0)