Skip to content

Conversation

JiaqiWang18
Copy link
Contributor

@JiaqiWang18 JiaqiWang18 commented Aug 25, 2025

What changes were proposed in this pull request?

Propagate partition columns specified in the destination table into during flow execution for batch table write.

Fix below exception during flow execution:

org.apache.spark.sql.AnalysisException: Specified partitioning does not match that of the existing table spark_catalog.default.mv.
Specified partition columns: [].
Existing partition columns: [id_mod].

Why are the changes needed?

In "append" mode with saveAsTable, partition columns must be specified in query because the format and options of the existing table is used, and the table could have been created with partition columns.

Does this PR introduce any user-facing change?

No

How was this patch tested?

New unit tests

Was this patch authored or co-authored using generative AI tooling?

No

@JiaqiWang18
Copy link
Contributor Author

@anishm-db @gengliangwang

Comment on lines +450 to +452
val updateContext = new PipelineUpdateContextImpl(graph, eventCallback = _ => ())
updateContext.pipelineExecution.runPipeline()
updateContext.pipelineExecution.awaitCompletion()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use PipelineTest.startPipelineAndWaitForCompletion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we actually don't extend PipelineTest in PythonPipelineSuite, adding it is causing some conflict in the inheritance hierarchy. PipelineTest does have a lot of helpful methods like checkAnswer, might worth to do a refactor separately


// check table is created with correct partitioning
Seq("mv", "st").foreach { tableName =>
val table = spark.sessionState.catalog.getTableMetadata(graphIdentifier(tableName))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: lets use DSv2 API, i.e spark.sessionState.catalogManager

Comment on lines +459 to +461
val rows = spark.table(tableName).collect().map(r => (r.getLong(0), r.getLong(1))).toSet
val expected = (0 until 5).map(id => (id.toLong, (id % 2).toLong)).toSet
assert(rows == expected)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use checkAnswer

Comment on lines 295 to 298
val tableMeta = spark.sessionState.catalog.getTableMetadata(
fullyQualifiedIdentifier(tableName)
)
assert(tableMeta.partitionColumnNames == Seq("id_mod"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same nit as above, use DSv2 if possible

@@ -434,6 +434,34 @@ class PythonPipelineSuite
.map(_.identifier) == Seq(graphIdentifier("a"), graphIdentifier("something")))
}

test("MV/ST with partition columns works") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add a test for what happens when a user attempts to change partition columns between pipeline runs?

I'd expect that to either trigger a full refresh or throw an exception. Either are probably acceptable, but it'd be nice for this behavior to be tested.

Copy link
Contributor Author

@JiaqiWang18 JiaqiWang18 Aug 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we already have a test for this: specifying partition column different from existing partitioned table in MaterializeTablesSuite, it throws a CANNOT_UPDATE_PARTITION_COLUMNS error

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants