Skip to content

Commit

Permalink
fix: Added support to read iceberg tables partitioned by date (#6430)
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam authored Nov 26, 2024
1 parent 89007ff commit b93333f
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,14 @@
//
package io.deephaven.iceberg.base;

import io.deephaven.base.Pair;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.context.QueryScope;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.iceberg.util.IcebergReadInstructions;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand All @@ -26,7 +21,6 @@
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.jetbrains.annotations.NotNull;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.deephaven.util.type.TypeUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.iceberg.*;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.jetbrains.annotations.NotNull;

import java.net.URI;
Expand Down Expand Up @@ -54,6 +55,13 @@ public IcebergKeyValuePartitionedLayout(
// in the output definition, so we can ignore duplicates.
final MutableInt icebergIndex = new MutableInt(0);
final Map<String, Integer> availablePartitioningColumns = partitionSpec.fields().stream()
.peek(partitionField -> {
// TODO (deephaven-core#6438): Add support to handle non-identity transforms
if (!partitionField.transform().isIdentity()) {
throw new TableDataException("Partition field " + partitionField.name() + " has a " +
"non-identity transform: " + partitionField.transform() + ", which is not supported");
}
})
.map(PartitionField::name)
.map(name -> instructions.columnRenames().getOrDefault(name, name))
.collect(Collectors.toMap(
Expand Down Expand Up @@ -89,11 +97,19 @@ IcebergTableLocationKey keyFromDataFile(
final PartitionData partitionData = (PartitionData) dataFile.partition();
for (final ColumnData colData : outputPartitioningColumns) {
final String colName = colData.name;
final Object colValue = partitionData.get(colData.index);
if (colValue != null && !colData.type.isAssignableFrom(colValue.getClass())) {
throw new TableDataException("Partitioning column " + colName
+ " has type " + colValue.getClass().getName()
+ " but expected " + colData.type.getName());
final Object colValue;
final Object valueFromPartitionData = partitionData.get(colData.index);
if (valueFromPartitionData != null) {
// TODO (deephaven-core#6438): Assuming identity transform here
colValue = IdentityPartitionConverters.convertConstant(
partitionData.getType(colData.index), valueFromPartitionData);
if (!colData.type.isAssignableFrom(colValue.getClass())) {
throw new TableDataException("Partitioning column " + colName
+ " has type " + colValue.getClass().getName()
+ " but expected " + colData.type.getName());
}
} else {
colValue = null;
}
partitions.put(colName, (Comparable<?>) colValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,14 +754,7 @@ void testPartitionedAppendWithAllPartitioningTypes() {
"DoublePC = (double) 4.0",
"LocalDatePC = LocalDate.parse(`2023-10-01`)")
.moveColumns(7, "data");

// TODO (deephaven-core#6419) Dropping the local data column since it is not supported on the read side.
// Remove this when the issue is fixed.
final TableDefinition tableDefinitionWithoutLocalDate = fromIceberg.dropColumns("LocalDatePC").getDefinition();
final Table fromIcebergWithoutLocalDate = tableAdapter.table(IcebergReadInstructions.builder()
.tableDefinition(tableDefinitionWithoutLocalDate)
.build());
assertTableEquals(expected.dropColumns("LocalDatePC"), fromIcebergWithoutLocalDate);
assertTableEquals(expected, fromIceberg);
}

@Test
Expand Down

0 comments on commit b93333f

Please sign in to comment.