Skip to content

Commit

Permalink
Change error for Warning, add assertion in RowUtils, rollback test ch…
Browse files Browse the repository at this point in the history
…anges
  • Loading branch information
osopardo1 committed Jan 13, 2025
1 parent d351eb3 commit 21ebb0c
Show file tree
Hide file tree
Showing 13 changed files with 294 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ case class LinearTransformer(columnName: String, dataType: QDataType) extends Tr
}
}

override def shouldBeDeterministic: Boolean = true

override def stats: ColumnStats =
ColumnStats(
names = Seq(colMax, colMin),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ trait Transformer extends Serializable {

def spec: String = s"$columnName:${transformerType.transformerSimpleName}"

/**
* Returns if the transformation should have deterministic values at indexing time
* @return
*/
def shouldBeDeterministic: Boolean = false

}

trait ColumnStats extends Serializable {
Expand Down
36 changes: 22 additions & 14 deletions core/src/main/scala/io/qbeast/spark/index/OTreeDataAnalyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -218,20 +218,6 @@ object DoublePassOTreeDataAnalyzer
options: QbeastOptions): (DataFrame, TableChanges) = {
logTrace(s"Begin: Analyzing the input data with existing revision: ${indexStatus.revision}")

// Check if the DataFrame is deterministic
logDebug(s"Checking the determinism of the input data")
val isSourceDeterministic =
analyzeDataFrameDeterminism(dataFrame, indexStatus.revision)
// TODO: we need to add columnStats control before the assert
// TODO: Otherwise, the write would fail even if the user adds the correct configuration
assert(
isSourceDeterministic,
s"The source query is non-deterministic. " +
s"Due to Qbeast-Spark write nature, we load the DataFrame twice before writing to storage." +
s"It is required to have deterministic sources and deterministic columns to index " +
s"to preserve the state of the indexing pipeline. " +
s"If it is not the case, please save the DF as delta and Convert it To Qbeast in a second step")

// Compute the changes in the space: cube size, transformers, and transformations.
val (revisionChanges, numElements) =
computeRevisionChanges(indexStatus.revision, options, dataFrame)
Expand All @@ -241,6 +227,28 @@ object DoublePassOTreeDataAnalyzer
}
logDebug(s"revisionToUse=$revisionToUse")

// Check if the DataFrame and the Columns To Index are deterministic
val deterministicColumnsToAnalyze =
revisionToUse.columnTransformers.filter(_.shouldBeDeterministic).map(_.columnName)
if (deterministicColumnsToAnalyze.nonEmpty) {
logDebug(
s"Some columnsToIndex need to come from a Deterministic Source: {${deterministicColumnsToAnalyze
.mkString(",")}}. Checking the determinism of the input data")
val isDataFrameDeterministic =
analyzeDataFrameDeterminism(dataFrame, deterministicColumnsToAnalyze)
if (!isDataFrameDeterministic) {
logWarning(
s"The source query is non-deterministic. " +
s"Due to Qbeast-Spark write nature, we load the DataFrame twice before writing to storage." +
s"It is required to have deterministic sources and deterministic columns to index " +
s"to preserve the state of the indexing pipeline. " +
s"If it is not the case, you can:" +
s"1. Change the transformer type to quantiles." +
s"2. Add columnStats with a greater space range to avoid indexing errors." +
s"3. save the DF as delta and Convert it To Qbeast in a second step")
}
}

// Add a random weight column
val weightedDataFrame = dataFrame.transform(addRandomWeight(revisionToUse))

Expand Down
12 changes: 11 additions & 1 deletion core/src/main/scala/io/qbeast/spark/index/RowUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,17 @@ object RowUtils {
var i = 0
for (t <- revision.transformations) {
val v = row.get(i)
coordinates += t.transform(v)
val transformation = t.transform(v)
assert(
transformation >= 0.0 && transformation <= 1.0,
s"Value $v is out of bounds of the Transformation. This can happen if you are indexing a Dataframe that:" +
s"1. It's source is constantly changing." +
s"2. The query that produces the DataFrame is not deterministic." +
s"To avoid this problem, we suggest to: " +
s"1. Change the transformer type to quantiles." +
s"2. Add columnStats with a greater space range to avoid indexing errors." +
s"3. save the DF as delta and Convert it To Qbeast in a second step")
coordinates += transformation
i += 1

}
Expand Down
25 changes: 14 additions & 11 deletions core/src/main/scala/io/qbeast/spark/index/SparkPlanAnalyzer.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.qbeast.spark.index

import io.qbeast.core.model.Revision
import io.qbeast.spark.internal.rules.QbeastRelation
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.expressions.Expression
Expand Down Expand Up @@ -89,29 +88,33 @@ trait SparkPlanAnalyzer {
* Analyzes the DataFrame to determine if it's execution is safely deterministic for indexing
*
* - The logical plan of the DataFrame is checked for determinism
* - The columns to index are checked for determinism
* - The columns to analyze (to index) are checked for determinism
*
* @param dataFrame
* the DataFrame to analyze
* @param revision
* the Revision to analyze
* @param columnsToAnalyze
* the columns to analyze for determinism
* @return
*/
def analyzeDataFrameDeterminism(dataFrame: DataFrame, revision: Revision): Boolean = {
def analyzeDataFrameDeterminism(
dataFrame: DataFrame,
columnsToAnalyze: Seq[String]): Boolean = {
// Access the logical plan of the DataFrame
val logicalPlan: LogicalPlan = dataFrame.queryExecution.logical

// Check if the logical plan's query is deterministic
// Detect if the DataFrame's operations are deterministic
val isQueryDeterministic: Boolean = isLogicalPlanDeterministic(logicalPlan)
val isPlanDeterministic: Boolean = isLogicalPlanDeterministic(logicalPlan)

// Check if any of the columns to index in the DataFrame is deterministic
val columnsToIndex = revision.columnTransformers.map(_.columnName)
val areColumnsToIndexDeterministic: Boolean =
columnsToIndex.forall(column => isColumnDeterministic(logicalPlan, column))
// Verify whether all columns required to have a deterministic nature are indeed deterministic
val areColumnsToAnalyzeDeterministic: Boolean =
if (columnsToAnalyze.isEmpty) true // If no columns are provided, return true
else {
columnsToAnalyze.forall(columnName => isColumnDeterministic(logicalPlan, columnName))
}

// Check if the source is deterministic
isQueryDeterministic && areColumnsToIndexDeterministic
isPlanDeterministic && areColumnsToAnalyzeDeterministic
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ class ConvertToQbeastDeltaTest

it should "not create new revisions for a qbeast table" in withSparkAndTmpDir(
(spark, tmpDir) => {
loadTestData(spark).write
loadTestData(spark)
.limit(dataSize)
.write
.format("qbeast")
.option("columnsToIndex", columnsToIndex.mkString(","))
.option("cubeSize", dcs)
Expand Down Expand Up @@ -187,7 +189,9 @@ class ConvertToQbeastDeltaTest
convertFromFormat(spark, "parquet", tmpDir)

// Append qbeast data
loadTestData(spark).write
loadTestData(spark)
.limit(dataSize)
.write
.mode("append")
.format("qbeast")
.save(tmpDir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ class QbeastSnapshotDeltaTest extends QbeastIntegrationTestSpec {
"Appends" should "only update metadata when needed" in
withQbeastContextSparkAndTmpDir { (spark, tmpDir) =>
{
val df = createDF(100)
val df = loadTestData(spark).limit(5000)
df.write
.format("qbeast")
.option("columnsToIndex", "age,val2")
.option("columnsToIndex", "user_id,price")
.save(tmpDir)
df.write.mode("append").format("qbeast").save(tmpDir)

Expand Down
111 changes: 111 additions & 0 deletions src/test/scala/io/qbeast/spark/index/RowUtilsTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package io.qbeast.spark.index

import io.qbeast.core.model.DoubleDataType
import io.qbeast.core.model.Point
import io.qbeast.core.model.QTableID
import io.qbeast.core.model.Revision
import io.qbeast.core.model.StringDataType
import io.qbeast.core.transform.CDFNumericQuantilesTransformation
import io.qbeast.core.transform.CDFQuantilesTransformer
import io.qbeast.core.transform.HashTransformation
import io.qbeast.core.transform.HashTransformer
import io.qbeast.core.transform.LinearTransformation
import io.qbeast.core.transform.LinearTransformer
import io.qbeast.QbeastIntegrationTestSpec
import org.apache.spark.sql.AnalysisException

class RowUtilsTest extends QbeastIntegrationTestSpec {

"RowUtils" should "transform a row using LinearTransformation" in withQbeastContextSparkAndTmpWarehouse {
(spark, _) =>
import spark.implicits._
val df = Seq((1.0, "a")).toDF("id", "name")
val revision = Revision(
12L,
12L,
QTableID("test"),
100,
Vector(LinearTransformer("id", DoubleDataType)),
List(LinearTransformation(0.0, 10.0, 5.0, DoubleDataType)))
val row = df.head
val transformedRow = RowUtils.rowValuesToPoint(row, revision)
transformedRow shouldBe Point(Vector(0.1))

}

it should "transform a row using HashTransformation" in withQbeastContextSparkAndTmpWarehouse {
(spark, _) =>
import spark.implicits._
val df = Seq(("1", "a")).toDF("id", "name")
val revision = Revision(
12L,
12L,
QTableID("test"),
100,
Vector(HashTransformer("id", StringDataType)),
List(HashTransformation("null")))
val row = df.head
val transformedRow = RowUtils.rowValuesToPoint(row, revision)
transformedRow shouldBe Point(Vector(0.24913018394686756))

}

it should "transform a row using quantiles" in withQbeastContextSparkAndTmpWarehouse {
(spark, _) =>
import spark.implicits._
val df = Seq((1.0, "a")).toDF("id", "name")
val revision = Revision(
12L,
12L,
QTableID("test"),
100,
Vector(CDFQuantilesTransformer("id", DoubleDataType)),
List(CDFNumericQuantilesTransformation(Array(0.0, 2.0), DoubleDataType)))
val row = df.head
val transformedRow = RowUtils.rowValuesToPoint(row, revision)
transformedRow shouldBe Point(Vector(0.5))
}

it should "throw an error when values are out of max bound" in withQbeastContextSparkAndTmpWarehouse {
(spark, _) =>
import spark.implicits._
val df = Seq((20.0, "a")).toDF("id", "name")
val revision = Revision(
12L,
12L,
QTableID("test"),
100,
Vector(LinearTransformer("id", DoubleDataType)),
List(LinearTransformation(0.0, 10.0, 5.0, DoubleDataType)))
val row = df.head
an[AssertionError] shouldBe thrownBy(RowUtils.rowValuesToPoint(row, revision))

}

it should "throw an error when values are out of min bound" in withQbeastContextSparkAndTmpWarehouse {
(spark, _) =>
import spark.implicits._
val df = Seq((-1.0, "a")).toDF("id", "name")
val revision = Revision(
12L,
12L,
QTableID("test"),
100,
Vector(LinearTransformer("id", DoubleDataType)),
List(LinearTransformation(0.0, 10.0, 5.0, DoubleDataType)))
val row = df.head
an[AssertionError] shouldBe thrownBy(RowUtils.rowValuesToPoint(row, revision))

}

it should "throw an error when Transformations are empty" in withQbeastContextSparkAndTmpWarehouse {
(spark, _) =>
import spark.implicits._
val df = Seq((-1.0, "a")).toDF("id", "name")
val revision = Revision(12L, 12L, QTableID("test"), 100, Vector.empty, List.empty)
val row = df.head
an[AnalysisException] shouldBe thrownBy(RowUtils.rowValuesToPoint(row, revision))

}

}
99 changes: 99 additions & 0 deletions src/test/scala/io/qbeast/spark/index/SparkPlanAnalyzerTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package io.qbeast.spark.index

import io.qbeast.QbeastIntegrationTestSpec

class SparkPlanAnalyzerTest extends QbeastIntegrationTestSpec {

object SparkPlanAnalyzerTesting extends SparkPlanAnalyzer

private lazy val nonDeterministicUDF = org.apache.spark.sql.functions
.udf(() => {
scala.util.Random.nextInt()
})
.asNondeterministic()

"SparkPlanAnalyzer" should "detect underterminism in LIMIT" in withSparkAndTmpDir {
(spark, _) =>
val df = spark
.range(10)
.toDF("id")
.limit(5)
val columnsToAnalyze = Seq("id")
val isDeterministic =
SparkPlanAnalyzerTesting.analyzeDataFrameDeterminism(df, columnsToAnalyze)
isDeterministic shouldBe false
}

it should "detect undeterminism in SAMPLE" in withSparkAndTmpDir { (spark, _) =>
val df = spark
.range(10)
.toDF("id")
.sample(0.1)
val columnsToAnalyze = Seq("id")
val isDeterministic =
SparkPlanAnalyzerTesting.analyzeDataFrameDeterminism(df, columnsToAnalyze)
isDeterministic shouldBe false
}

it should "detect non-determinism in non-deterministic columns" in withSparkAndTmpDir {
(spark, _) =>
val df = spark
.range(10)
.withColumn("non_deterministic_col", nonDeterministicUDF())

val columnsToAnalyze = Seq("non_deterministic_col")
val isDeterministic =
SparkPlanAnalyzerTesting.analyzeDataFrameDeterminism(df, columnsToAnalyze)
isDeterministic shouldBe false
}

it should "detect non-deterministic query filters" in withSparkAndTmpDir { (spark, tmpDir) =>
val df = spark
.range(10)
.toDF("id")
.filter(
nonDeterministicUDF() > 5
) // The filter contains non-deterministic predicates that can affect the results when executed multiple times

val columnsToAnalyze = Seq("id")
val isDeterministic =
SparkPlanAnalyzerTesting.analyzeDataFrameDeterminism(df, columnsToAnalyze)
isDeterministic shouldBe false

}

it should "return true if no columnsToAnalyze are provided" in withSparkAndTmpDir {
(spark, tmpDir) =>
val df = spark.range(10).toDF("id")
val isDeterministic = SparkPlanAnalyzerTesting.analyzeDataFrameDeterminism(df, Seq.empty)
isDeterministic shouldBe true
}

it should "return true if all columns are deterministic" in withSparkAndTmpDir {
(spark, tmpDir) =>
val df = spark.range(10).toDF("id")
val columnsToAnalyze = Seq("id")
val isDeterministic =
SparkPlanAnalyzerTesting.analyzeDataFrameDeterminism(df, columnsToAnalyze)
isDeterministic shouldBe true
}

it should "return true if filters are deterministic" in withSparkAndTmpDir { (spark, tmpDir) =>
val df = spark.range(10).toDF("id").filter("id > 5")
val isDeterministic = SparkPlanAnalyzerTesting.analyzeDataFrameDeterminism(df, Seq("id"))
isDeterministic shouldBe true
}

it should "mark a Qbeast Sample as deterministic" in withSparkAndTmpDir { (spark, tmpDir) =>
val qbeastDir = tmpDir + "/qbeast"
val df = spark.range(10).toDF("id")
df.write.format("qbeast").option("columnsToIndex", "id").save(qbeastDir)

val qbeastDF = spark.read.format("qbeast").load(qbeastDir)
val sampleDF = qbeastDF.sample(0.5)
val isDeterministic =
SparkPlanAnalyzerTesting.analyzeDataFrameDeterminism(sampleDF, Seq("id"))
isDeterministic shouldBe true
}

}
Loading

0 comments on commit 21ebb0c

Please sign in to comment.