From 0f6756caa110740f4101747e46b4adc2b14b1d77 Mon Sep 17 00:00:00 2001 From: Jonathan Kelly Date: Tue, 8 Dec 2020 14:27:15 -0800 Subject: [PATCH] Fix prewarming query planning unintentionally Before this change, logic in ExperimentStatus would cause query planning to be prewarmed as a side effect of capturing the current plan. This change removes this side effect, but adds an option to restore the previous behavior if desired by the user. --- .../databricks/spark/sql/perf/Benchmark.scala | 63 ++++++++++++++----- .../spark/sql/perf/Benchmarkable.scala | 15 +++-- .../sql/perf/BenchmarkableListener.scala | 10 +++ .../com/databricks/spark/sql/perf/Query.scala | 5 +- .../mllib/MLPipelineStageBenchmarkable.scala | 3 +- .../databricks/spark/sql/perf/results.scala | 4 +- 6 files changed, 74 insertions(+), 26 deletions(-) create mode 100644 src/main/scala/com/databricks/spark/sql/perf/BenchmarkableListener.scala diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala index ebb49353..ea8e5700 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -27,6 +27,7 @@ import scala.util.control.NonFatal import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, DataFrame, SQLContext, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.SparkContext import com.databricks.spark.sql.perf.cpu._ @@ -106,10 +107,22 @@ abstract class Benchmark( tags: Map[String, String] = Map.empty, timeout: Long = 0L, resultLocation: String = resultsLocation, - forkThread: Boolean = true) = { - - new ExperimentStatus(executionsToRun, includeBreakdown, iterations, variations, tags, - timeout, resultLocation, sqlContext, allTables, currentConfiguration, forkThread = forkThread) + forkThread: Boolean = true, + prewarmQueryPlanning: Boolean = false) = { + + new ExperimentStatus( + executionsToRun, + includeBreakdown, + iterations, + variations, + tags, + timeout, + resultLocation, + sqlContext, + allTables, + currentConfiguration, + forkThread = forkThread, + prewarmQueryPlanning = prewarmQueryPlanning) } @@ -240,7 +253,8 @@ abstract class Benchmark( protected override def doBenchmark( includeBreakdown: Boolean, description: String = "", - messages: ArrayBuffer[String]): BenchmarkResult = { + messages: ArrayBuffer[String], + listener: Option[BenchmarkableListener]): BenchmarkResult = { try { val timeMs = measureTimeMs(run()) BenchmarkResult( @@ -298,7 +312,8 @@ object Benchmark { sqlContext: SQLContext, allTables: Seq[Table], currentConfiguration: BenchmarkConfiguration, - forkThread: Boolean = true) { + forkThread: Boolean = true, + prewarmQueryPlanning: Boolean = false) { val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]() val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]() val currentMessages = new collection.mutable.ArrayBuffer[String]() @@ -379,21 +394,15 @@ object Benchmark { logMessage(s"Running execution ${q.name} $setup") currentExecution = q.name - currentPlan = q match { - case query: Query => - try { - query.newDataFrame().queryExecution.executedPlan.toString() - } catch { - case e: Exception => - s"failed to parse: $e" - } - case _ => "" - } + currentPlan = "" // Clear previous plan if any + + prewarmQueryPlanningIfEnabled(q) + startTime = System.currentTimeMillis() val singleResultT = Try { q.benchmark(includeBreakdown, setup, currentMessages, timeout, - forkThread=forkThread) + forkThread=forkThread, listener = Some(queryListener)) } singleResultT match { @@ -444,6 +453,26 @@ object Benchmark { logCollection() } + private def prewarmQueryPlanningIfEnabled(benchmarkable: Benchmarkable): Unit = { + if (prewarmQueryPlanning) { + benchmarkable match { + case query: Query => query.newDataFrame().queryExecution.executedPlan + case _ => + } + } + } + + private val queryListener = new BenchmarkableListener { + + override def onQueryPlanned(plan: SparkPlan): Unit = { + currentPlan = try { + plan.toString + } catch { + case NonFatal(e) => s"failed to parse: $e" + } + } + } + def scheduleCpuCollection(fs: FS) = { logCollection = () => { logMessage(s"Begining CPU log collection") diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala index 24efef70..8ce7a269 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala @@ -43,14 +43,15 @@ trait Benchmarkable { description: String = "", messages: ArrayBuffer[String], timeout: Long, - forkThread: Boolean = true): BenchmarkResult = { + forkThread: Boolean = true, + listener: Option[BenchmarkableListener] = None): BenchmarkResult = { logger.info(s"$this: benchmark") sparkContext.setJobDescription(s"Execution: $name, $description") beforeBenchmark() val result = if (forkThread) { - runBenchmarkForked(includeBreakdown, description, messages, timeout) + runBenchmarkForked(includeBreakdown, description, messages, timeout, listener) } else { - doBenchmark(includeBreakdown, description, messages) + doBenchmark(includeBreakdown, description, messages, listener) } afterBenchmark(sqlContext.sparkContext) result @@ -66,7 +67,8 @@ trait Benchmarkable { includeBreakdown: Boolean, description: String = "", messages: ArrayBuffer[String], - timeout: Long): BenchmarkResult = { + timeout: Long, + listener: Option[BenchmarkableListener]): BenchmarkResult = { val jobgroup = UUID.randomUUID().toString val that = this var result: BenchmarkResult = null @@ -75,7 +77,7 @@ trait Benchmarkable { logger.info(s"$that running $this") sparkContext.setJobGroup(jobgroup, s"benchmark $name", true) try { - result = doBenchmark(includeBreakdown, description, messages) + result = doBenchmark(includeBreakdown, description, messages, listener) } catch { case e: Throwable => logger.info(s"$that: failure in runBenchmark: $e") @@ -107,7 +109,8 @@ trait Benchmarkable { protected def doBenchmark( includeBreakdown: Boolean, description: String = "", - messages: ArrayBuffer[String]): BenchmarkResult + messages: ArrayBuffer[String], + listener: Option[BenchmarkableListener]): BenchmarkResult protected def measureTimeMs[A](f: => A): Double = { val startTime = System.nanoTime() diff --git a/src/main/scala/com/databricks/spark/sql/perf/BenchmarkableListener.scala b/src/main/scala/com/databricks/spark/sql/perf/BenchmarkableListener.scala new file mode 100644 index 00000000..99aa7b24 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/BenchmarkableListener.scala @@ -0,0 +1,10 @@ +package com.databricks.spark.sql.perf + +import org.apache.spark.sql.execution.SparkPlan + +private[perf] trait BenchmarkableListener { + + /** Called after a query in a [[Benchmarkable]] is planned **/ + def onQueryPlanned(plan: SparkPlan): Unit = { + } +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/Query.scala b/src/main/scala/com/databricks/spark/sql/perf/Query.scala index babc63f0..e0bdc97b 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Query.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Query.scala @@ -62,7 +62,8 @@ class Query( protected override def doBenchmark( includeBreakdown: Boolean, description: String = "", - messages: ArrayBuffer[String]): BenchmarkResult = { + messages: ArrayBuffer[String], + listener: Option[BenchmarkableListener]): BenchmarkResult = { try { val dataFrame = buildDataFrame val queryExecution = dataFrame.queryExecution @@ -80,6 +81,8 @@ class Query( queryExecution.executedPlan } + listener.foreach(_.onQueryPlanned(queryExecution.executedPlan)) + val breakdownResults = if (includeBreakdown) { val depth = queryExecution.executedPlan.collect { case p: SparkPlan => p }.size val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan.p(i))) diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala index 8296f46b..88db967d 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala @@ -45,7 +45,8 @@ class MLPipelineStageBenchmarkable( override protected def doBenchmark( includeBreakdown: Boolean, description: String, - messages: ArrayBuffer[String]): BenchmarkResult = { + messages: ArrayBuffer[String], + listener: Option[BenchmarkableListener]): BenchmarkResult = { try { val (trainingTime, model: Transformer) = measureTime { logger.info(s"$this: train: trainingSet=${trainingData.schema}") diff --git a/src/main/scala/com/databricks/spark/sql/perf/results.scala b/src/main/scala/com/databricks/spark/sql/perf/results.scala index 28d72263..44b87573 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/results.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/results.scala @@ -26,13 +26,15 @@ import com.databricks.spark.sql.perf.mllib.ReflectionUtils * @param tags Tags of this iteration (variations are stored at here). * @param configuration Configuration properties of this iteration. * @param results The performance results of queries for this iteration. + * @param prewarmQueryPlanning Whether prewarm query planning was enabled. */ case class ExperimentRun( timestamp: Long, iteration: Int, tags: Map[String, String], configuration: BenchmarkConfiguration, - results: Seq[BenchmarkResult]) + results: Seq[BenchmarkResult], + prewarmQueryPlanning: Boolean = false) /** * The configuration used for an iteration of an experiment.