Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix prewarming query planning unintentionally #193

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 46 additions & 17 deletions src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.util.control.NonFatal
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, DataFrame, SQLContext, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.SparkContext

import com.databricks.spark.sql.perf.cpu._
Expand Down Expand Up @@ -106,10 +107,22 @@ abstract class Benchmark(
tags: Map[String, String] = Map.empty,
timeout: Long = 0L,
resultLocation: String = resultsLocation,
forkThread: Boolean = true) = {

new ExperimentStatus(executionsToRun, includeBreakdown, iterations, variations, tags,
timeout, resultLocation, sqlContext, allTables, currentConfiguration, forkThread = forkThread)
forkThread: Boolean = true,
prewarmQueryPlanning: Boolean = false) = {

new ExperimentStatus(
executionsToRun,
includeBreakdown,
iterations,
variations,
tags,
timeout,
resultLocation,
sqlContext,
allTables,
currentConfiguration,
forkThread = forkThread,
prewarmQueryPlanning = prewarmQueryPlanning)
}


Expand Down Expand Up @@ -240,7 +253,8 @@ abstract class Benchmark(
protected override def doBenchmark(
includeBreakdown: Boolean,
description: String = "",
messages: ArrayBuffer[String]): BenchmarkResult = {
messages: ArrayBuffer[String],
listener: Option[BenchmarkableListener]): BenchmarkResult = {
try {
val timeMs = measureTimeMs(run())
BenchmarkResult(
Expand Down Expand Up @@ -298,7 +312,8 @@ object Benchmark {
sqlContext: SQLContext,
allTables: Seq[Table],
currentConfiguration: BenchmarkConfiguration,
forkThread: Boolean = true) {
forkThread: Boolean = true,
prewarmQueryPlanning: Boolean = false) {
val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]()
val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]()
val currentMessages = new collection.mutable.ArrayBuffer[String]()
Expand Down Expand Up @@ -379,21 +394,15 @@ object Benchmark {
logMessage(s"Running execution ${q.name} $setup")

currentExecution = q.name
currentPlan = q match {
case query: Query =>
try {
query.newDataFrame().queryExecution.executedPlan.toString()
} catch {
case e: Exception =>
s"failed to parse: $e"
}
case _ => ""
}
currentPlan = "" // Clear previous plan if any

prewarmQueryPlanningIfEnabled(q)

startTime = System.currentTimeMillis()

val singleResultT = Try {
q.benchmark(includeBreakdown, setup, currentMessages, timeout,
forkThread=forkThread)
forkThread=forkThread, listener = Some(queryListener))
}

singleResultT match {
Expand Down Expand Up @@ -444,6 +453,26 @@ object Benchmark {
logCollection()
}

private def prewarmQueryPlanningIfEnabled(benchmarkable: Benchmarkable): Unit = {
if (prewarmQueryPlanning) {
benchmarkable match {
case query: Query => query.newDataFrame().queryExecution.executedPlan
case _ =>
}
}
}

private val queryListener = new BenchmarkableListener {

override def onQueryPlanned(plan: SparkPlan): Unit = {
currentPlan = try {
plan.toString
} catch {
case NonFatal(e) => s"failed to parse: $e"
}
}
}

def scheduleCpuCollection(fs: FS) = {
logCollection = () => {
logMessage(s"Begining CPU log collection")
Expand Down
15 changes: 9 additions & 6 deletions src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ trait Benchmarkable {
description: String = "",
messages: ArrayBuffer[String],
timeout: Long,
forkThread: Boolean = true): BenchmarkResult = {
forkThread: Boolean = true,
listener: Option[BenchmarkableListener] = None): BenchmarkResult = {
logger.info(s"$this: benchmark")
sparkContext.setJobDescription(s"Execution: $name, $description")
beforeBenchmark()
val result = if (forkThread) {
runBenchmarkForked(includeBreakdown, description, messages, timeout)
runBenchmarkForked(includeBreakdown, description, messages, timeout, listener)
} else {
doBenchmark(includeBreakdown, description, messages)
doBenchmark(includeBreakdown, description, messages, listener)
}
afterBenchmark(sqlContext.sparkContext)
result
Expand All @@ -66,7 +67,8 @@ trait Benchmarkable {
includeBreakdown: Boolean,
description: String = "",
messages: ArrayBuffer[String],
timeout: Long): BenchmarkResult = {
timeout: Long,
listener: Option[BenchmarkableListener]): BenchmarkResult = {
val jobgroup = UUID.randomUUID().toString
val that = this
var result: BenchmarkResult = null
Expand All @@ -75,7 +77,7 @@ trait Benchmarkable {
logger.info(s"$that running $this")
sparkContext.setJobGroup(jobgroup, s"benchmark $name", true)
try {
result = doBenchmark(includeBreakdown, description, messages)
result = doBenchmark(includeBreakdown, description, messages, listener)
} catch {
case e: Throwable =>
logger.info(s"$that: failure in runBenchmark: $e")
Expand Down Expand Up @@ -107,7 +109,8 @@ trait Benchmarkable {
protected def doBenchmark(
includeBreakdown: Boolean,
description: String = "",
messages: ArrayBuffer[String]): BenchmarkResult
messages: ArrayBuffer[String],
listener: Option[BenchmarkableListener]): BenchmarkResult

protected def measureTimeMs[A](f: => A): Double = {
val startTime = System.nanoTime()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.databricks.spark.sql.perf

import org.apache.spark.sql.execution.SparkPlan

private[perf] trait BenchmarkableListener {

/** Called after a query in a [[Benchmarkable]] is planned **/
def onQueryPlanned(plan: SparkPlan): Unit = {
}
}
5 changes: 4 additions & 1 deletion src/main/scala/com/databricks/spark/sql/perf/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class Query(
protected override def doBenchmark(
includeBreakdown: Boolean,
description: String = "",
messages: ArrayBuffer[String]): BenchmarkResult = {
messages: ArrayBuffer[String],
listener: Option[BenchmarkableListener]): BenchmarkResult = {
try {
val dataFrame = buildDataFrame
val queryExecution = dataFrame.queryExecution
Expand All @@ -80,6 +81,8 @@ class Query(
queryExecution.executedPlan
}

listener.foreach(_.onQueryPlanned(queryExecution.executedPlan))

val breakdownResults = if (includeBreakdown) {
val depth = queryExecution.executedPlan.collect { case p: SparkPlan => p }.size
val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan.p(i)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class MLPipelineStageBenchmarkable(
override protected def doBenchmark(
includeBreakdown: Boolean,
description: String,
messages: ArrayBuffer[String]): BenchmarkResult = {
messages: ArrayBuffer[String],
listener: Option[BenchmarkableListener]): BenchmarkResult = {
try {
val (trainingTime, model: Transformer) = measureTime {
logger.info(s"$this: train: trainingSet=${trainingData.schema}")
Expand Down
4 changes: 3 additions & 1 deletion src/main/scala/com/databricks/spark/sql/perf/results.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ import com.databricks.spark.sql.perf.mllib.ReflectionUtils
* @param tags Tags of this iteration (variations are stored at here).
* @param configuration Configuration properties of this iteration.
* @param results The performance results of queries for this iteration.
* @param prewarmQueryPlanning Whether prewarm query planning was enabled.
*/
case class ExperimentRun(
timestamp: Long,
iteration: Int,
tags: Map[String, String],
configuration: BenchmarkConfiguration,
results: Seq[BenchmarkResult])
results: Seq[BenchmarkResult],
prewarmQueryPlanning: Boolean = false)

/**
* The configuration used for an iteration of an experiment.
Expand Down