From 829ed201a33840743ad0afafd7f0affdb182d1db Mon Sep 17 00:00:00 2001 From: Peach-He Date: Thu, 28 Nov 2019 14:03:29 +0800 Subject: [PATCH 1/2] TPCDS setup support for spark 3.0.0 --- src/main/notebooks/TPC-multi_datagen.scala | 6 +++--- src/main/scala/com/databricks/spark/sql/perf/Tables.scala | 6 +++--- .../com/databricks/spark/sql/perf/tpcds/TPCDSTables.scala | 5 +++-- .../scala/com/databricks/spark/sql/perf/tpch/TPCH.scala | 5 +++-- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/main/notebooks/TPC-multi_datagen.scala b/src/main/notebooks/TPC-multi_datagen.scala index eef98f45..22639990 100644 --- a/src/main/notebooks/TPC-multi_datagen.scala +++ b/src/main/notebooks/TPC-multi_datagen.scala @@ -145,17 +145,17 @@ def getBenchmarkData(benchmark: String, scaleFactor: String) = benchmark match { case "TPCH" => ( s"tpch_sf${scaleFactor}_${fileFormat}${dbSuffix}", - new TPCHTables(spark.sqlContext, dbgenDir = s"${baseDatagenFolder}/dbgen", scaleFactor = scaleFactor, useDoubleForDecimal = false, useStringForDate = false, generatorParams = Nil), + new TPCHTables(spark, spark.sqlContext, dbgenDir = s"${baseDatagenFolder}/dbgen", scaleFactor = scaleFactor, useDoubleForDecimal = false, useStringForDate = false, generatorParams = Nil), s"$baseLocation/tpch/sf${scaleFactor}_${fileFormat}") case "TPCDS" if !TPCDSUseLegacyOptions => ( s"tpcds_sf${scaleFactor}_${fileFormat}${dbSuffix}", - new TPCDSTables(spark.sqlContext, dsdgenDir = s"${baseDatagenFolder}/dsdgen", scaleFactor = scaleFactor, useDoubleForDecimal = false, useStringForDate = false), + new TPCDSTables(spark, spark.sqlContext, dsdgenDir = s"${baseDatagenFolder}/dsdgen", scaleFactor = scaleFactor, useDoubleForDecimal = false, useStringForDate = false), s"$baseLocation/tpcds-2.4/sf${scaleFactor}_${fileFormat}") case "TPCDS" if TPCDSUseLegacyOptions => ( s"tpcds_sf${scaleFactor}_nodecimal_nodate_withnulls${dbSuffix}", - new TPCDSTables(spark.sqlContext, s"${baseDatagenFolder}/dsdgen", scaleFactor = scaleFactor, useDoubleForDecimal = true, useStringForDate = true), + new TPCDSTables(spark, spark.sqlContext, s"${baseDatagenFolder}/dsdgen", scaleFactor = scaleFactor, useDoubleForDecimal = true, useStringForDate = true), s"$baseLocation/tpcds/sf$scaleFactor-$fileFormat/useDecimal=false,useDate=false,filterNull=false") } diff --git a/src/main/scala/com/databricks/spark/sql/perf/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/Tables.scala index 177d38ce..fd1b40e3 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Tables.scala @@ -27,7 +27,7 @@ import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ -import org.apache.spark.sql.{Row, SQLContext, SaveMode} +import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession} /** @@ -94,7 +94,7 @@ trait DataGenerator extends Serializable { } -abstract class Tables(sqlContext: SQLContext, scaleFactor: String, +abstract class Tables(sqlSession SparkSession, sqlContext: SQLContext, scaleFactor: String, useDoubleForDecimal: Boolean = false, useStringForDate: Boolean = false) extends Serializable { @@ -254,7 +254,7 @@ abstract class Tables(sqlContext: SQLContext, scaleFactor: String, if (!tableExists || overwrite) { println(s"Creating external table $name in database $databaseName using data stored in $location.") log.info(s"Creating external table $name in database $databaseName using data stored in $location.") - sqlContext.createExternalTable(qualifiedTableName, location, format) + sqlSession.createTable(qualifiedTableName, location, format) } if (partitionColumns.nonEmpty && discoverPartitions) { println(s"Discovering partitions for table $name.") diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDSTables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDSTables.scala index 8243cd34..93028d3d 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDSTables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDSTables.scala @@ -22,7 +22,7 @@ import com.databricks.spark.sql.perf import com.databricks.spark.sql.perf.{BlockingLineStream, DataGenerator, Table, Tables} import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{SQLContext, SparkSession} class DSDGEN(dsdgenDir: String) extends DataGenerator { val dsdgen = s"$dsdgenDir/dsdgen" @@ -55,12 +55,13 @@ class DSDGEN(dsdgenDir: String) extends DataGenerator { class TPCDSTables( + sqlSession: SparkSession sqlContext: SQLContext, dsdgenDir: String, scaleFactor: String, useDoubleForDecimal: Boolean = false, useStringForDate: Boolean = false) - extends Tables(sqlContext, scaleFactor, useDoubleForDecimal, useStringForDate) { + extends Tables(sqlSession, sqlContext, scaleFactor, useDoubleForDecimal, useStringForDate) { import sqlContext.implicits._ val dataGenerator = new DSDGEN(dsdgenDir) diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpch/TPCH.scala b/src/main/scala/com/databricks/spark/sql/perf/tpch/TPCH.scala index 5a23edf9..63d64128 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpch/TPCH.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpch/TPCH.scala @@ -22,7 +22,7 @@ import com.databricks.spark.sql.perf.ExecutionMode.CollectResults import org.apache.commons.io.IOUtils import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{SQLContext, SparkSession} class DBGEN(dbgenDir: String, params: Seq[String]) extends DataGenerator { val dbgen = s"$dbgenDir/dbgen" @@ -64,13 +64,14 @@ class DBGEN(dbgenDir: String, params: Seq[String]) extends DataGenerator { } class TPCHTables( + sqlSession: SparkSession sqlContext: SQLContext, dbgenDir: String, scaleFactor: String, useDoubleForDecimal: Boolean = false, useStringForDate: Boolean = false, generatorParams: Seq[String] = Nil) - extends Tables(sqlContext, scaleFactor, useDoubleForDecimal, useStringForDate) { + extends Tables(sqlSession, sqlContext, scaleFactor, useDoubleForDecimal, useStringForDate) { import sqlContext.implicits._ val dataGenerator = new DBGEN(dbgenDir, generatorParams) From 6b40d316ec4642230cbb2cfd59cd49d6c3ea291f Mon Sep 17 00:00:00 2001 From: Peach-He Date: Thu, 28 Nov 2019 14:09:34 +0800 Subject: [PATCH 2/2] update README for spark3 TPC-DS setup --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 335b0e64..b0d6cf29 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,7 @@ val databaseName = ... // name of database to create. val scaleFactor = ... // scaleFactor defines the size of the dataset to generate (in GB). val format = ... // valid spark format like parquet "parquet". // Run: -val tables = new TPCDSTables(sqlContext, +val tables = new TPCDSTables(spark, sqlContext, dsdgenDir = "/tmp/tpcds-kit/tools", // location of dsdgen scaleFactor = scaleFactor, useDoubleForDecimal = false, // true to replace DecimalType with DoubleType @@ -188,4 +188,4 @@ For running parallel TPCDS streams: ### tpch_run notebook -This notebook can be used to run TPCH queries. Data needs be generated first. \ No newline at end of file +This notebook can be used to run TPCH queries. Data needs be generated first.