Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DH-18143: Improve handling of sort order for Iceberg tables #6646

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

malhotrashivam
Copy link
Contributor

@malhotrashivam malhotrashivam commented Feb 13, 2025

Related to DH-18143

@malhotrashivam malhotrashivam added this to the 0.38.0 milestone Feb 13, 2025
@malhotrashivam malhotrashivam self-assigned this Feb 13, 2025
@malhotrashivam malhotrashivam marked this pull request as draft February 13, 2025 21:54
@malhotrashivam malhotrashivam force-pushed the nightly/sm-sort-order branch 2 times, most recently from fe2f89f to 67d1003 Compare February 14, 2025 19:04
@malhotrashivam malhotrashivam removed the request for review from devinrsmith February 14, 2025 19:04
@malhotrashivam
Copy link
Contributor Author

This PR has three commits as follows:

  • Commit1: If the manifest file indicates that the data file is sorted by specific columns, we should return those column names in TableLocation::getSortedColumns. This can help later for pushdown predicate-style filtering.
  • Commit2: If a Deephaven table being written is sorted on a column, we should set the corresponding sort order in the manifest file.
  • Commit3: If the table has a default sort order, we should allow user to opt into the sorting on default sort columns during writing, or opt out.

I am not a fan of Commit 2 for two reasons (please review the PR so the following make more sense):

  • Commit 2 adds extra complexity to iceberg writing code, which is not needed right now.
  • Commit 2 can only add a single column name to the data file, since we have a constraint (https://deephaven.atlassian.net/browse/DH-18700). Now a single column name is also added to the parquet file as a sort column. Therefore, even if we don't add the column name to data file, deephaven would still be read as a sort column through the parquet reading code and all tests will pass. Although, other iceberg reading tools will not know about such sort columns if we don't add it to the data file.
    So I think we can remove commit 2 from this PR.

@malhotrashivam malhotrashivam marked this pull request as ready for review February 14, 2025 20:54
final Schema schema = sortOrder.schema();
final List<SortColumn> sortColumns = new ArrayList<>(sortOrder.fields().size());
for (final SortField field : sortOrder.fields()) {
final ColumnName columnName = ColumnName.of(schema.findColumnName(field.sourceId()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might throw an InvalidNameException; we might need to wait for some Resolver work I'm doing in https://deephaven.atlassian.net/browse/DH-18365 to land so we can properly map field ids.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not mark this as resolved yet

Comment on lines +75 to +79
if (field.nullOrder() == NullOrder.NULLS_FIRST && field.direction() == SortDirection.ASC) {
sortColumn = SortColumn.asc(columnName);
} else if (field.nullOrder() == NullOrder.NULLS_LAST && field.direction() == SortDirection.DESC) {
sortColumn = SortColumn.desc(columnName);
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should raise the issue of null-first, nulls-last with the engine team. Arguably, this is something we should want to support.

Additionally, we may need to hold of on handling any floating point columns.

-NaN < -Infinity < -value < -0 < 0 < value < Infinity < NaN, https://iceberg.apache.org/spec/#sorting

The -NaN v NaN is something I have not seen before, but another issue to raise w/ engine team.

In the meantime, I think the strategy of breaking and returning what we have so far should be OK.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* @return A stream of {@link DataFile} objects.
*/
public static Stream<DataFile> allDataFiles(@NotNull final Table table, @NotNull ManifestFile manifestFile) {
return toStream(ManifestFiles.read(manifestFile, table.io()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recently learned that the files themselves may have metadata, ie org.apache.iceberg.ManifestReader#spec. It makes me want to add caution extending these helper methods too far. While we aren't passing along ManifestReader#spec today, we may need to in the future and might need to model it as appropriate.

Comment on lines 59 to 68
private List<SortColumn> computeSortedColumns() {
final Integer sortOrderId = dataFile.sortOrderId();
// If sort order ID is missing, unknown or unsorted, we fall back to reading sort columns from the parquet file
if (sortOrderId == null) {
return super.getSortedColumns();
}
final SortOrder sortOrder = tableAdapter.icebergTable().sortOrders().get(sortOrderId);
if (sortOrder == null || sortOrder.isUnsorted()) {
return super.getSortedColumns();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an interesting question: if the metadata exists on the file itself, should we prefer it? I can imagine a case where we are setting more specific sort column information in the file itself.

For example, maybe Iceberg knows this table is sorted on columns [A, B], but the parquet metadata gives us more information that it is sorted on columns [A, B, C].

There's also an argument to be made that we should completely ignore the metadata from the file itself, and only rely on Iceberg. This saves us from needing to materialize the parquet file metadata (at least from this code path). In particular, if Iceberg explicitly gives us back sortOrder.isUnsorted(), maybe we should be okay just returning an empty list?

It's also possible that we want this to be configurable... it's not obvious to me what the best course of action is.

Comment on lines +42 to +50
public List<SortColumn> getSortedColumns() {
return sortedColumns == null ? super.getSortedColumns() : sortedColumns;
}

@Nullable
private static List<SortColumn> computeSortedColumns(
@NotNull final IcebergTableAdapter tableAdapter,
@NotNull final DataFile dataFile) {
final Integer sortOrderId = dataFile.sortOrderId();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to think about behavior when unsorted (either b/c null or explicitly set to unsorted)...

devinrsmith
devinrsmith previously approved these changes Mar 6, 2025
@classmethod
def from_sort_id(cls, sort_order_id: int) -> 'SortOrderProvider':
"""
Use the sort order with the given ID to sort new data while writing to the iceberg table.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea what any of this means. It needs more description. e.g. why would a sort order have an ID? How would I know what the IDs are? etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sort order is actually a Iceberg concept (https://iceberg.apache.org/spec/#sorting) and I don't want to add too much detail in our documentation for that.
So I have added a link for the spec at two places. Let me know if this makes it any better.

Comment on lines 332 to 335
Returns a sort order provider that delegates to this provider for computing the columns to sort on, but writes a
different sort order ID to the iceberg table.
For example, this provider might return fields {A, B, C} to sort on, but the ID written to iceberg corresponds
to sort order with fields {A, B}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not clear enough. See earlier comments.

to sort order with fields {A, B}.

Args:
sort_order_id (int): the sort order ID to write to the iceberg table.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tables have sort order IDs stored with them? This is the kind of stuff that isn't documented well enough. No part of this documentation would have suggested such a thing to me. There is a lot of assumed knowledge that needs to be explicit in the docs for the new code.

Comment on lines 401 to 404
sort_order_provider: Optional[SortOrderProvider]: Used to provide SortOrder to be used for sorting new data
while writing to an iceberg table using this writer. Note that we select the sort order of the Table at
the time the writer is constructed, and it does not change if the table's sort order changes. Defaults
to `None`, which means use the table's default sort order.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See earlier comments on needing a more readable, clear, and expanded docstring.

Comment on lines 274 to 276
class SortOrderProvider(JObjectWrapper):
"""
:class:`.SortOrderProvider` is used for providing SortOrder to be used for sorting new data while writing to an
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I don't see any test for the new class.
  2. The name SortOrderProvider seems to suggest that it might be used to provide a SortOrder object to be used when working with Iceberg data, but in fact all the factory methods return an instance of itself. I am not opposed to this pattern per se, but the docstring on the methods probably can be more explicit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still don't have unit testing for iceberg. We have an open ticket for that DH-18261, but it hasn't been prioritised yet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants