[Flink] Support withShard read of Flink #5650
Open
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Purpose
In my company, we use Flink session cluster read paimon for OLAP.
When the QPS of search is high, many problems are gradually becoming apparent.
1、The JobManager's IO is high. Because all job's plan will read meta on FileSystem.
2、To accelerate queries, when we scale-up
scan.manifest.parallelism
, the memory of JobManager is high.3、The query speed become slow. Because when QPS is high, the JobManager is busy, this will reduce the speed of the plan or other logic.
4、When QPS is high, the memory is heavy, even though we DropStats after plan.
We found the reason is all job's plan is run on JobManager. In order for the system can accept higher QPS and the query speed can be faster, we we want add an option that user can move the plan to TaskManager.
Be more detailed, when open the option, ShardStaticFileStoreSource will create as many splits as source parallelism, this splits will be assigned to reader. In ShardSourceReader#addSplits we use TableScan withShard feature to get the splits that belong to this reader.

When DynamicPartitionPruning enabled, we add the DynamicPartitionPruning info into MockSplit, after reader receive it, reader will filter partitions after plan withShard.

Tests
1、UT cases
org.apache.paimon.flink.source.shardread.ShardReadAssignModeTest
org.apache.paimon.flink.source.shardread.ShardSourceReaderTest
2、IT cases
org.apache.paimon.flink.source.shardread.ShardReadITCase
3、Job Test

We run some jobs to test data integrity. The result is no problem.
(1) Without ShardRead
(2)ShardRead Without Failover Without Speculation Execution.

(3) ShardRead Without Failover with Speculation Execution.

(4)ShardRead With Failover Without Speculation Execution.

(5) ShardRead With Failover With Speculation Execution.

API and Format
Documentation