Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #479: Error Control for Non-Deterministic Source Queries #512

Closed

Conversation

osopardo1
Copy link
Member

@osopardo1 osopardo1 commented Dec 11, 2024

Issue Description

#414 revealed a significant limitation in the writing process for Spark.

The current approach requires multiple traversals over the DataFrame, including the following steps:

  1. Collecting statistics from the data: Calculating stats such as min/max values for the columns to index and the count of elements.
  2. Estimating the index: Determining how the data should be indexed.
  3. Indexing rows: Assign rows to cubes
  4. Writing data: Group rows by cubes and write them to files.

This repeated loading of the DataFrame introduces potential inconsistencies if the input data source is:

  1. modified by another process or
  2. built using non-deterministic functions that produce different results across executions

If either of these situations occurs, the results from one of the writing steps described above will not be reliable for the next. For instance, if a change occurs between steps 1 and 2 or steps 1 and 3, we may be unable to correctly position the row within the space. If a change to the source occurs after step 2, the estimated index may not be 100% correct for the changed data.

Proposed Solution

To address part of this issue, we propose introducing a new agent, the SparkPlanAnalyzer, to improve error handling for non-deterministic queries before processing the data.

Non-Deterministic Queries Unsupported

To properly define the types of Queries that aren't supported anymore, let's list the Non-Deterministic query plans that Spark can encounter:

  • LIMIT -> Applying a LIMIT clause twice (if the Source is not SORTED) would lead to different results.
  • SAMPLE -> The Spark Sample Command uses Random Sampling to extract the percentage of rows specified in the operation. Unless it is a Qbeast Table, we cannot ensure the determinism of the results.
  • FILTERING with a Non-Deterministic Column -> Using rand() or other types of non-deterministic predicates would lead to different results depending on the execution.
  • Indexing a Non-Deterministic Column -> Calculating statistics over a non-deterministic column would provoke a mismatch between Transformations computed in step 1 and the results from steps 2 and 3. No error should be raised if all indexing columns are deterministic

User Workaround

This approach provides users with 4 options:

  • Modify the query: Ensure that the query (for the indexing columns) is deterministic to avoid inconsistencies.
  • Materialize the query results before indexing. Instead of indexing the DF directly, save the data with another format before indexing.
  • Add columnStats using the .option method: By providing column statistics directly, users can mitigate these problems. However, this does not guarantee that the final written values will match those produced by the initial query.
  • Use more flexible Transformer Types: Changing the default LinearTransformation for numeric columns to Quantiles would map all values outside the min/max range to the extremes of the space.

Type of change

New feature.

Checklist:

Here is the list of things you should do before submitting this pull request:

  • New feature/bug fix has been committed following the Contribution guide.
  • Add logging to the code following the Contribution guide.
  • Add comments to the code (make it easier for the community!).
  • Change the documentation.
  • Add tests.
  • Your branch is updated to the main branch (dependent changes have been merged).

How Has This Been Tested? (Optional)

This is tested with QbeastInputSourcesTest. Added tests for error control when using:

  • LIMIT
  • FILTER BY Non-Deterministic Columns
  • SAMPLE
  • Use of Non-deterministic Expressions on projection.

@osopardo1
Copy link
Member Author

This PR is blocked until we complete the #520 . Since the determinism of the DataFrame is part of the Revision Flow/DataFrame Analysis, we aim to have a proper logic component before introducing new changes.

@osopardo1 osopardo1 marked this pull request as ready for review December 20, 2024 08:00
@osopardo1
Copy link
Member Author

osopardo1 commented Jan 10, 2025

After some discussion, we agreed to:

  • Throw a Warning instead of an Error when indexing Non-Deterministic Columns or Queries. This Warning should advise the user that the query is non-deterministic, and that she/he can expect a failure during the process (if the data falls outside the Revision boundaries). It should also contain the two possible solutions:
    • Add columnStats
    • Materialize the data in a previous step
  • Group the Transformers that might be impacted by the non-determinism (in this case, just the LinearTransformer) and only apply checks to those included.
  • Move the Determinism checks to the SparkRevisionFactory component.
  • Error control on LinearTrasnformation.transform -> values outside [0.0,1.0) would cause the indexing process to stop. The error should also contain the three possible solutions: columnStats , quantiles, and Materialization.

@osopardo1 osopardo1 requested a review from Jiaweihu08 January 14, 2025 06:49
@osopardo1
Copy link
Member Author

UPDATE:

We weren't confident enough with the solution, and we recently found out that Delta Lake has a similar issue when doing Merge INTO.

They solved it by materializing the data involved in the operation before executing the second pass. We can introduce that change in a second step. I would advocate to:

  1. Add the logic to detect Undeterministic Sources.
  2. Add the logic to materialize the data under demand. (This includes configuration of the Storage Level, enforcement..)

@osopardo1 osopardo1 requested a review from Jiaweihu08 January 22, 2025 08:39
@osopardo1
Copy link
Member Author

Closing it due to inactivity. I would update the documentation to proceed in case of error.

@osopardo1 osopardo1 closed this Jan 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants