Skip to content

[spark], [infra] run spark integration tests in CI. #5590

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

zhongyujiang
Copy link
Contributor

Purpose

The current github CI for Spark module is missing integration tests, and some of Spark's integration tests are actually failing, they've just been consistently ignored by the CI.

Tests

API and Format

Documentation

Copy link
Contributor Author

@zhongyujiang zhongyujiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @Zouxxyy @YannByron Can you please take a look when you have time? Thanks!

@@ -58,6 +58,6 @@ jobs:
test_modules+="org.apache.paimon:paimon-spark-${suffix},"
done
test_modules="${test_modules%,}"
mvn -T 2C -B test -pl "${test_modules}" -Duser.timezone=$jvm_timezone -Pspark4,flink1
mvn -T 2C -B verify -pl "${test_modules}" -Duser.timezone=$jvm_timezone -Pspark4,flink1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test phase only cover unit tests, not integration tests.

@zhongyujiang
Copy link
Contributor Author

Some unit tests are failing, it looks like it's releated to the changes in the time travel part. Let me take a look.

return snapshot.isPresent()
? Optional.of(
schemaManager().schema(snapshot.get().schemaId()).copy(options.toMap()))
: Optional.empty();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was changed to throw an exception directly if the time travel fails to find the snapshot.

But when I'm investigating the time travel releated test failures, I found that it seems Paimon currently allows querying a non-existent snapshot and returns an empty result.
Is this behavior reasonable? Shouldn't an exception be thrown if time travel fails to find the snapshot?

// set scan.snapshot-id = 4, this query will read data from the next commit.
val query2 = spark.readStream
.format("paimon")
.option("scan.snapshot-id", 4)
.load(location)
.writeStream
.format("memory")
.option("checkpointLocation", checkpointDir2.getCanonicalPath)
.queryName("mem_table2")
.outputMode("append")
.start()
val currentResult1 = () => spark.sql("SELECT * FROM mem_table1")
val currentResult2 = () => spark.sql("SELECT * FROM mem_table2")
try {
query1.processAllAvailable()
query2.processAllAvailable()
var totalStreamingData1 = latestChanges
var totalStreamingData2 = Seq.empty[Row]
checkAnswer(currentResult1(), totalStreamingData1)
checkAnswer(currentResult2(), totalStreamingData2)
spark.sql("INSERT INTO T VALUES (40, 'v_40'), (41, 'v_41'), (42, 'v_42')")
query1.processAllAvailable()
query2.processAllAvailable()
totalStreamingData1 =
totalStreamingData1 ++ (Row(40, "v_40") :: Row(41, "v_41") :: Row(42, "v_42") :: Nil)
totalStreamingData2 =
totalStreamingData2 ++ (Row(40, "v_40") :: Row(41, "v_41") :: Row(42, "v_42") :: Nil)
checkAnswer(currentResult1(), totalStreamingData1)
checkAnswer(currentResult2(), totalStreamingData2)
} finally {
query1.stop()
query2.stop()
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant