Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import org.apache.spark.api.python.PythonUtils
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.connect.service.SparkConnectService
import org.apache.spark.sql.pipelines.graph.DataflowGraph
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.pipelines.graph.{DataflowGraph, PipelineUpdateContextImpl}
import org.apache.spark.sql.pipelines.utils.{EventVerificationTestHelpers, TestPipelineUpdateContextMixin}

/**
Expand Down Expand Up @@ -434,6 +435,36 @@ class PythonPipelineSuite
.map(_.identifier) == Seq(graphIdentifier("a"), graphIdentifier("something")))
}

test("MV/ST with partition columns works") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add a test for what happens when a user attempts to change partition columns between pipeline runs?

I'd expect that to either trigger a full refresh or throw an exception. Either are probably acceptable, but it'd be nice for this behavior to be tested.

Copy link
Contributor Author

@JiaqiWang18 JiaqiWang18 Aug 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we already have a test for this: specifying partition column different from existing partitioned table in MaterializeTablesSuite, it throws a CANNOT_UPDATE_PARTITION_COLUMNS error

val graph = buildGraph("""
|from pyspark.sql.functions import col
|
|@dp.materialized_view(partition_cols = ["id_mod"])
|def mv():
| return spark.range(5).withColumn("id_mod", col("id") % 2)
|
|@dp.table(partition_cols = ["id_mod"])
|def st():
| return spark.readStream.table("mv")
|""".stripMargin)

val updateContext = new PipelineUpdateContextImpl(graph, eventCallback = _ => ())
updateContext.pipelineExecution.runPipeline()
updateContext.pipelineExecution.awaitCompletion()
Comment on lines +451 to +453
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use PipelineTest.startPipelineAndWaitForCompletion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we actually don't extend PipelineTest in PythonPipelineSuite, adding it is causing some conflict in the inheritance hierarchy. PipelineTest does have a lot of helpful methods like checkAnswer, might worth to do a refactor separately


// check table is created with correct partitioning
val catalog = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog]

Seq("mv", "st").foreach { tableName =>
val table = catalog.loadTable(Identifier.of(Array("default"), tableName))
assert(table.partitioning().map(_.references().head.fieldNames().head) === Array("id_mod"))

val rows = spark.table(tableName).collect().map(r => (r.getLong(0), r.getLong(1))).toSet
val expected = (0 until 5).map(id => (id.toLong, (id % 2).toLong)).toSet
assert(rows == expected)
Comment on lines +462 to +464
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use checkAnswer

}
}

test("create pipeline without table will throw RUN_EMPTY_PIPELINE exception") {
checkError(
exception = intercept[AnalysisException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ class BatchTableWrite(
}
dataFrameWriter
.mode("append")
// In "append" mode with saveAsTable, partition columns must be specified in query
// because the format and options of the existing table is used, and the table could
// have been created with partition columns.
.partitionBy(destination.partitionCols.getOrElse(Seq.empty): _*)
.saveAsTable(destination.identifier.unquotedString)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.pipelines.graph

import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.pipelines.utils.{PipelineTest, TestGraphRegistrationContext}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{LongType, StructType}
Expand Down Expand Up @@ -266,6 +267,44 @@ class SqlPipelineSuite extends PipelineTest with SharedSparkSession {
)
}

test("MV/ST with partition columns works") {
val unresolvedDataflowGraph = unresolvedDataflowGraphFromSql(
sqlText = """
|CREATE MATERIALIZED VIEW mv
|PARTITIONED BY (id_mod)
|AS
|SELECT
| id,
| id % 2 AS id_mod
|FROM range(3);
|
|CREATE STREAMING TABLE st
|PARTITIONED BY (id_mod)
|AS
|SELECT * FROM STREAM(mv);
|""".stripMargin
)
startPipelineAndWaitForCompletion(unresolvedDataflowGraph)
val expected = Seq(
Row(0, 0),
Row(1, 1),
Row(2, 0)
)
val catalog = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog]

Seq("mv", "st").foreach { tableName =>
// check table partition columns
val table = catalog.loadTable(Identifier.of(Array("test_db"), tableName))
assert(table.partitioning().map(_.references().head.fieldNames().head) === Array("id_mod"))

// check table data
checkAnswer(
spark.sql(s"SELECT * FROM ${fullyQualifiedIdentifier(tableName)}"),
expected
)
}
}

test("Exception is thrown when non-identity partition columns are used") {
val graphRegistrationContext = new TestGraphRegistrationContext(spark)
val sqlGraphRegistrationContext = new SqlGraphRegistrationContext(graphRegistrationContext)
Expand Down