From c471a258d1b2eb79a8f4794ccb02e8ca298ad836 Mon Sep 17 00:00:00 2001 From: adelly13 Date: Wed, 16 Oct 2024 00:11:58 -0700 Subject: [PATCH 1/2] Added Unit Test for Flink-Spark Equality Delete Write --- flink-spark-bundle/build.gradle | 80 ++++ .../TestFlinkIcebergSinkV2Base.java | 390 ++++++++++++++++++ .../iceberg/flinkspark/TestFlinkSpark.java | 348 ++++++++++++++++ settings.gradle | 2 + 4 files changed, 820 insertions(+) create mode 100644 flink-spark-bundle/build.gradle create mode 100644 flink-spark-bundle/src/test/java/org/apache/iceberg/flinkspark/TestFlinkIcebergSinkV2Base.java create mode 100644 flink-spark-bundle/src/test/java/org/apache/iceberg/flinkspark/TestFlinkSpark.java diff --git a/flink-spark-bundle/build.gradle b/flink-spark-bundle/build.gradle new file mode 100644 index 000000000000..5c79e7367d34 --- /dev/null +++ b/flink-spark-bundle/build.gradle @@ -0,0 +1,80 @@ +// Combined build.gradle for a new module that includes both Flink and Spark + +plugins { + id 'java' + id 'scala' +} + +String flinkMajorVersion = '1.20' +String sparkMajorVersion = '3.5' +String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion") + +repositories { + mavenCentral() +} + +dependencies { + // Common dependencies + implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + api project(':iceberg-api') + implementation project(':iceberg-common') + implementation project(':iceberg-core') + implementation project(':iceberg-data') + implementation project(':iceberg-orc') + implementation project(':iceberg-parquet') + implementation project(':iceberg-hive-metastore') + implementation libs.datasketches + + implementation project(":iceberg-flink:iceberg-flink-1.20") +// testImplementation project(":iceberg-flink:iceberg-flink-1.20") + implementation project(":iceberg-flink:iceberg-flink-runtime-1.20") + + implementation project(":iceberg-spark:iceberg-spark-3.5_2.12") + implementation project(":iceberg-spark:iceberg-spark-extensions-3.5_2.12") + implementation project(":iceberg-spark:iceberg-spark-runtime-3.5_2.12") + + testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') + testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts') + testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts') + testImplementation project(path: ':iceberg-data', configuration: 'testArtifacts') + testImplementation project(path: ':iceberg-flink:iceberg-flink-1.20', configuration: 'testArtifacts') + + + // Flink-specific dependencies + compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink120.get()}" + compileOnly libs.flink120.avro + compileOnly libs.flink120.metrics.dropwizard + compileOnly libs.flink120.streaming.java + compileOnly libs.flink120.table.api.java.bridge + compileOnly libs.flink120.connector.base + compileOnly libs.flink120.connector.files + + // Spark-specific dependencies + compileOnly "org.apache.spark:spark-hive_${scalaVersion}:${libs.versions.spark.hive35.get()}" + implementation("org.scala-lang.modules:scala-collection-compat_${scalaVersion}:${libs.versions.scala.collection.compat.get()}") + implementation libs.caffeine + implementation libs.parquet.column + implementation libs.parquet.hadoop + implementation libs.arrow.vector + + // Conditional dependency for Scala 2.12 + if (scalaVersion == '2.12') { + implementation 'org.scala-lang:scala-library:2.12.18' + } + + // Test dependencies + testImplementation libs.flink120.connector.test.utils + testImplementation libs.flink120.core + testImplementation libs.flink120.runtime + testImplementation libs.awaitility + testImplementation libs.assertj.core + testImplementation libs.sqlite.jdbc + testImplementation libs.hadoop2.minicluster + testImplementation libs.junit.vintage.engine + testImplementation libs.junit.jupiter + testImplementation libs.slf4j.simple +} + +test { + useJUnitPlatform() +} \ No newline at end of file diff --git a/flink-spark-bundle/src/test/java/org/apache/iceberg/flinkspark/TestFlinkIcebergSinkV2Base.java b/flink-spark-bundle/src/test/java/org/apache/iceberg/flinkspark/TestFlinkIcebergSinkV2Base.java new file mode 100644 index 000000000000..3d3673bbf12d --- /dev/null +++ b/flink-spark-bundle/src/test/java/org/apache/iceberg/flinkspark/TestFlinkIcebergSinkV2Base.java @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flinkspark; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.sink.FlinkSink; +import org.apache.iceberg.flink.source.BoundedTestSource; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.StructLikeSet; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class TestFlinkIcebergSinkV2Base { + + static final int FORMAT_V2 = 2; + static final TypeInformation ROW_TYPE_INFO = + new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); + + static final int ROW_ID_POS = 0; + static final int ROW_DATA_POS = 1; + + TableLoader tableLoader; + Table table; + StreamExecutionEnvironment env; + + @Parameter(index = 0) + FileFormat format; + + @Parameter(index = 1) + int parallelism = 1; + + @Parameter(index = 2) + boolean partitioned; + + @Parameter(index = 3) + String writeDistributionMode; + + @Parameters(name = "FileFormat={0}, Parallelism={1}, Partitioned={2}, WriteDistributionMode={3}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] {FileFormat.AVRO, 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_NONE}, + new Object[] {FileFormat.AVRO, 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_NONE}, + new Object[] {FileFormat.AVRO, 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_NONE}, + new Object[] {FileFormat.AVRO, 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_NONE}, + new Object[] {FileFormat.ORC, 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_HASH}, + new Object[] {FileFormat.ORC, 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_HASH}, + new Object[] {FileFormat.ORC, 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_HASH}, + new Object[] {FileFormat.ORC, 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_HASH}, + new Object[] {FileFormat.PARQUET, 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE}, + new Object[] {FileFormat.PARQUET, 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE}, + new Object[] {FileFormat.PARQUET, 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE}, + new Object[] {FileFormat.PARQUET, 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE} + }; + } + + static final Map ROW_KIND_MAP = + ImmutableMap.of( + "+I", RowKind.INSERT, + "-D", RowKind.DELETE, + "-U", RowKind.UPDATE_BEFORE, + "+U", RowKind.UPDATE_AFTER); + + Row row(String rowKind, int id, String data) { + RowKind kind = ROW_KIND_MAP.get(rowKind); + if (kind == null) { + throw new IllegalArgumentException("Unknown row kind: " + rowKind); + } + + return Row.ofKind(kind, id, data); + } + + void testUpsertOnIdDataKey(String branch) throws Exception { + List> elementsPerCheckpoint = + ImmutableList.of( + ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 2, "bbb")), + ImmutableList.of(row("+I", 1, "aaa"), row("-D", 2, "bbb"), row("+I", 2, "ccc")), + ImmutableList.of(row("+U", 1, "bbb"), row("-U", 1, "ccc"), row("-D", 1, "aaa"))); + + List> expectedRecords = + ImmutableList.of( + ImmutableList.of(record(1, "aaa"), record(2, "bbb")), + ImmutableList.of(record(1, "aaa"), record(2, "ccc")), + ImmutableList.of(record(1, "bbb"), record(2, "ccc"))); + testChangeLogs( + ImmutableList.of("id", "data"), + row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)), + true, + elementsPerCheckpoint, + expectedRecords, + branch); + } + + void testChangeLogOnIdDataKey(String branch) throws Exception { + List> elementsPerCheckpoint = + ImmutableList.of( + ImmutableList.of( + row("+I", 1, "aaa"), + row("-D", 1, "aaa"), + row("+I", 2, "bbb"), + row("+I", 1, "bbb"), + row("+I", 2, "aaa")), + ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")), + ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa"))); + + List> expectedRecords = + ImmutableList.of( + ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")), + ImmutableList.of( + record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")), + ImmutableList.of( + record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb"))); + + testChangeLogs( + ImmutableList.of("data", "id"), + row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)), + false, + elementsPerCheckpoint, + expectedRecords, + branch); + } + + void testChangeLogOnSameKey(String branch) throws Exception { + List> elementsPerCheckpoint = + ImmutableList.of( + // Checkpoint #1 + ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), row("+I", 1, "aaa")), + // Checkpoint #2 + ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa")), + // Checkpoint #3 + ImmutableList.of(row("-D", 1, "aaa"), row("+I", 1, "aaa")), + // Checkpoint #4 + ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 1, "aaa"))); + + List> expectedRecords = + ImmutableList.of( + ImmutableList.of(record(1, "aaa")), + ImmutableList.of(record(1, "aaa")), + ImmutableList.of(record(1, "aaa")), + ImmutableList.of(record(1, "aaa"), record(1, "aaa"))); + + testChangeLogs( + ImmutableList.of("id", "data"), + row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)), + false, + elementsPerCheckpoint, + expectedRecords, + branch); + } + + void testChangeLogOnDataKey(String branch) throws Exception { + List> elementsPerCheckpoint = + ImmutableList.of( + ImmutableList.of( + row("+I", 1, "aaa"), + row("-D", 1, "aaa"), + row("+I", 2, "bbb"), + row("+I", 1, "bbb"), + row("+I", 2, "aaa")), + ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")), + ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa"), row("+I", 2, "ccc"))); + + List> expectedRecords = + ImmutableList.of( + ImmutableList.of(record(1, "bbb"), record(2, "aaa")), + ImmutableList.of(record(1, "aaa"), record(1, "bbb"), record(1, "ccc")), + ImmutableList.of( + record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "ccc"))); + + testChangeLogs( + ImmutableList.of("data"), + row -> row.getField(ROW_DATA_POS), + false, + elementsPerCheckpoint, + expectedRecords, + branch); + } + + void testUpsertOnDataKey(String branch) throws Exception { + List> elementsPerCheckpoint = + ImmutableList.of( + ImmutableList.of(row("+I", 1, "aaa"), row("+I", 2, "aaa"), row("+I", 3, "bbb")), + ImmutableList.of(row("+U", 4, "aaa"), row("-U", 3, "bbb"), row("+U", 5, "bbb")), + ImmutableList.of(row("+I", 6, "aaa"), row("+U", 7, "bbb"))); + + List> expectedRecords = + ImmutableList.of( + ImmutableList.of(record(2, "aaa"), record(3, "bbb")), + ImmutableList.of(record(4, "aaa"), record(5, "bbb")), + ImmutableList.of(record(6, "aaa"), record(7, "bbb"))); + + testChangeLogs( + ImmutableList.of("data"), + row -> row.getField(ROW_DATA_POS), + true, + elementsPerCheckpoint, + expectedRecords, + branch); + } + + void testChangeLogOnIdKey(String branch) throws Exception { + List> elementsPerCheckpoint = + ImmutableList.of( + ImmutableList.of( + row("+I", 1, "aaa"), + row("-D", 1, "aaa"), + row("+I", 1, "bbb"), + row("+I", 2, "aaa"), + row("-D", 2, "aaa"), + row("+I", 2, "bbb")), + ImmutableList.of( + row("-U", 2, "bbb"), row("+U", 2, "ccc"), row("-D", 2, "ccc"), row("+I", 2, "ddd")), + ImmutableList.of( + row("-D", 1, "bbb"), + row("+I", 1, "ccc"), + row("-D", 1, "ccc"), + row("+I", 1, "ddd"))); + + List> expectedRecords = + ImmutableList.of( + ImmutableList.of(record(1, "bbb"), record(2, "bbb")), + ImmutableList.of(record(1, "bbb"), record(2, "ddd")), + ImmutableList.of(record(1, "ddd"), record(2, "ddd"))); + + if (partitioned && writeDistributionMode.equals(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)) { + assertThatThrownBy( + () -> + testChangeLogs( + ImmutableList.of("id"), + row -> row.getField(ROW_ID_POS), + false, + elementsPerCheckpoint, + expectedRecords, + branch)) + .isInstanceOf(IllegalStateException.class) + .hasMessageStartingWith( + "In 'hash' distribution mode with equality fields set, partition field") + .hasMessageContaining("should be included in equality fields:"); + + } else { + testChangeLogs( + ImmutableList.of("id"), + row -> row.getField(ROW_ID_POS), + false, + elementsPerCheckpoint, + expectedRecords, + branch); + } + } + + void testUpsertOnIdKey(String branch) throws Exception { + List> elementsPerCheckpoint = + ImmutableList.of( + ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "bbb")), + ImmutableList.of(row("+I", 1, "ccc")), + ImmutableList.of(row("+U", 1, "ddd"), row("+I", 1, "eee"))); + + List> expectedRecords = + ImmutableList.of( + ImmutableList.of(record(1, "bbb")), + ImmutableList.of(record(1, "ccc")), + ImmutableList.of(record(1, "eee"))); + + if (!partitioned) { + testChangeLogs( + ImmutableList.of("id"), + row -> row.getField(ROW_ID_POS), + true, + elementsPerCheckpoint, + expectedRecords, + branch); + } else { + assertThatThrownBy( + () -> + testChangeLogs( + ImmutableList.of("id"), + row -> row.getField(ROW_ID_POS), + true, + elementsPerCheckpoint, + expectedRecords, + branch)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("should be included in equality fields:"); + } + } + + void testChangeLogs( + List equalityFieldColumns, + KeySelector keySelector, + boolean insertAsUpsert, + List> elementsPerCheckpoint, + List> expectedRecordsPerCheckpoint, + String branch) + throws Exception { + DataStream dataStream = + env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO); + + FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) + .tableLoader(tableLoader) + .tableSchema(SimpleDataUtil.FLINK_SCHEMA) + .writeParallelism(parallelism) + .equalityFieldColumns(equalityFieldColumns) + .upsert(insertAsUpsert) + .toBranch(branch) + .append(); + + // Execute the program. + env.execute("Test Iceberg Change-Log DataStream."); + + table.refresh(); + List snapshots = findValidSnapshots(); + int expectedSnapshotNum = expectedRecordsPerCheckpoint.size(); + assertThat(snapshots).hasSize(expectedSnapshotNum); + + for (int i = 0; i < expectedSnapshotNum; i++) { + long snapshotId = snapshots.get(i).snapshotId(); + List expectedRecords = expectedRecordsPerCheckpoint.get(i); + assertThat(actualRowSet(snapshotId, "*")) + .as("Should have the expected records for the checkpoint#" + i) + .isEqualTo(expectedRowSet(expectedRecords.toArray(new Record[0]))); + } + } + + Record record(int id, String data) { + return SimpleDataUtil.createRecord(id, data); + } + + List findValidSnapshots() { + List validSnapshots = Lists.newArrayList(); + for (Snapshot snapshot : table.snapshots()) { + if (snapshot.allManifests(table.io()).stream() + .anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) { + validSnapshots.add(snapshot); + } + } + return validSnapshots; + } + + StructLikeSet expectedRowSet(Record... records) { + return SimpleDataUtil.expectedRowSet(table, records); + } + + StructLikeSet actualRowSet(long snapshotId, String... columns) throws IOException { + table.refresh(); + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + try (CloseableIterable reader = + IcebergGenerics.read(table).useSnapshot(snapshotId).select(columns).build()) { + reader.forEach(set::add); + } + return set; + } +} diff --git a/flink-spark-bundle/src/test/java/org/apache/iceberg/flinkspark/TestFlinkSpark.java b/flink-spark-bundle/src/test/java/org/apache/iceberg/flinkspark/TestFlinkSpark.java new file mode 100644 index 000000000000..c69603e2ab4e --- /dev/null +++ b/flink-spark-bundle/src/test/java/org/apache/iceberg/flinkspark/TestFlinkSpark.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flinkspark; + +import static org.apache.iceberg.flink.TestFixtures.DATABASE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; + +import java.util.List; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.types.Row; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.flink.MiniFlinkClusterExtension; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.sink.FlinkSink; +import org.apache.iceberg.flink.source.BoundedTestSource; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.apache.flink.types.Row; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.RowFactory; + +@ExtendWith(ParameterizedTestExtension.class) +@Timeout(value = 60) +public class TestFlinkSpark extends TestFlinkIcebergSinkV2Base { + @RegisterExtension + public static final MiniClusterExtension MINI_CLUSTER_EXTENSION = + MiniFlinkClusterExtension.createWithClassloaderCheckDisabled(); + + @RegisterExtension + private static final HadoopCatalogExtension CATALOG_EXTENSION = + new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE); + + @BeforeEach + public void setupTable() { + table = + CATALOG_EXTENSION + .catalog() + .createTable( + TestFixtures.TABLE_IDENTIFIER, + SimpleDataUtil.SCHEMA, + partitioned + ? PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build() + : PartitionSpec.unpartitioned(), + ImmutableMap.of( + TableProperties.DEFAULT_FILE_FORMAT, + format.name(), + TableProperties.FORMAT_VERSION, + String.valueOf(FORMAT_V2))); + + table + .updateProperties() + .set(TableProperties.DEFAULT_FILE_FORMAT, format.name()) + .set(TableProperties.WRITE_DISTRIBUTION_MODE, writeDistributionMode) + .commit(); + + env = + StreamExecutionEnvironment.getExecutionEnvironment( + MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG) + .enableCheckpointing(100L) + .setParallelism(parallelism) + .setMaxParallelism(parallelism); + + tableLoader = CATALOG_EXTENSION.tableLoader(); + } + + @TestTemplate + public void testCheckAndGetEqualityFieldIds() { + table + .updateSchema() + .allowIncompatibleChanges() + .addRequiredColumn("type", Types.StringType.get()) + .setIdentifierFields("type") + .commit(); + + DataStream dataStream = + env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO); + FlinkSink.Builder builder = + FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA).table(table); + + // Use schema identifier field IDs as equality field id list by default + assertThat(builder.checkAndGetEqualityFieldIds()) + .containsExactlyInAnyOrderElementsOf(table.schema().identifierFieldIds()); + + // Use user-provided equality field column as equality field id list + builder.equalityFieldColumns(Lists.newArrayList("id")); + assertThat(builder.checkAndGetEqualityFieldIds()) + .containsExactlyInAnyOrder(table.schema().findField("id").fieldId()); + + builder.equalityFieldColumns(Lists.newArrayList("type")); + assertThat(builder.checkAndGetEqualityFieldIds()) + .containsExactlyInAnyOrder(table.schema().findField("type").fieldId()); + } + + @TestTemplate + public void testChangeLogOnIdKey() throws Exception { + testChangeLogOnIdKey(SnapshotRef.MAIN_BRANCH); + } + + @TestTemplate + public void testUpsertOnlyDeletesOnDataKey() throws Exception { + List> elementsPerCheckpoint = + ImmutableList.of( + ImmutableList.of(row("+I", 1, "aaa")), + ImmutableList.of(row("-D", 1, "aaa"), row("-D", 2, "bbb"))); + + List> expectedRecords = + ImmutableList.of(ImmutableList.of(record(1, "aaa")), ImmutableList.of()); + + testChangeLogs( + ImmutableList.of("data"), + row -> row.getField(ROW_DATA_POS), + true, + elementsPerCheckpoint, + expectedRecords, + SnapshotRef.MAIN_BRANCH); + } + + @TestTemplate + public void testChangeLogOnDataKey() throws Exception { + testChangeLogOnDataKey(SnapshotRef.MAIN_BRANCH); + } + + @TestTemplate + public void testChangeLogOnIdDataKey() throws Exception { + testChangeLogOnIdDataKey(SnapshotRef.MAIN_BRANCH); + } + + @TestTemplate + public void testChangeLogOnSameKey() throws Exception { + testChangeLogOnSameKey(SnapshotRef.MAIN_BRANCH); + } + + @TestTemplate + public void testUpsertModeCheck() throws Exception { + DataStream dataStream = + env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO); + FlinkSink.Builder builder = + FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) + .tableLoader(tableLoader) + .tableSchema(SimpleDataUtil.FLINK_SCHEMA) + .writeParallelism(parallelism) + .upsert(true); + + assertThatThrownBy( + () -> + builder + .equalityFieldColumns(ImmutableList.of("id", "data")) + .overwrite(true) + .append()) + .isInstanceOf(IllegalStateException.class) + .hasMessage( + "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream."); + + if (writeDistributionMode.equals(DistributionMode.RANGE.modeName()) && !partitioned) { + // validation error thrown from distributeDataStream + assertThatThrownBy( + () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append()) + .isInstanceOf(IllegalStateException.class) + .hasMessage( + "Invalid write distribution mode: range. Need to define sort order or partition spec."); + } else { + // validation error thrown from appendWriter + assertThatThrownBy( + () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append()) + .isInstanceOf(IllegalStateException.class) + .hasMessage( + "Equality field columns shouldn't be empty when configuring to use UPSERT data stream."); + } + } + + @TestTemplate + public void testUpsertOnIdKey() throws Exception { + testUpsertOnIdKey(SnapshotRef.MAIN_BRANCH); + } + + @TestTemplate + public void testUpsertOnDataKey() throws Exception { + testUpsertOnDataKey(SnapshotRef.MAIN_BRANCH); + } + + @TestTemplate + public void testUpsertOnIdDataKey() throws Exception { + testUpsertOnIdDataKey(SnapshotRef.MAIN_BRANCH); + } + + @TestTemplate + public void testDeleteStats() throws Exception { + assumeThat(format).isNotEqualTo(FileFormat.AVRO); + + List> elementsPerCheckpoint = + ImmutableList.of( + // Checkpoint #1 + ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), row("+I", 1, "aaa"))); + + List> expectedRecords = ImmutableList.of(ImmutableList.of(record(1, "aaa"))); + + testChangeLogs( + ImmutableList.of("id", "data"), + row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)), + false, + elementsPerCheckpoint, + expectedRecords, + "main"); + + DeleteFile deleteFile = table.currentSnapshot().addedDeleteFiles(table.io()).iterator().next(); + String fromStat = + new String( + deleteFile.lowerBounds().get(MetadataColumns.DELETE_FILE_PATH.fieldId()).array()); + DataFile dataFile = table.currentSnapshot().addedDataFiles(table.io()).iterator().next(); + assumeThat(fromStat).isEqualTo(dataFile.path().toString()); + } + + @TestTemplate + public void testCheckAndGetEqualityTable() { + table + .updateSchema() + .allowIncompatibleChanges() + .addRequiredColumn("type", Types.StringType.get()) + .setIdentifierFields("type") + .commit(); + + DataStream dataStream = + env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO); + FlinkSink.Builder builder = + FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA).table(table); + + // Use schema identifier field IDs as equality field id list by default + assertThat(builder.checkAndGetEqualityFieldIds()) + .containsExactlyInAnyOrderElementsOf(table.schema().identifierFieldIds()); + + // Use user-provided equality field column as equality field id list + builder.equalityFieldColumns(Lists.newArrayList("id")); + assertThat(builder.checkAndGetEqualityFieldIds()) + .containsExactlyInAnyOrder(table.schema().findField("id").fieldId()); + + builder.equalityFieldColumns(Lists.newArrayList("type")); + assertThat(builder.checkAndGetEqualityFieldIds()) + .containsExactlyInAnyOrder(table.schema().findField("type").fieldId()); + } + + @TestTemplate + public void testEqualityDeleteWritesOnSpark() throws Exception { + // Step 1: Write initial data using Flink + DataStream dataStream = env.fromCollection( + ImmutableList.of( + row("+I", 1, "value1"), + row("+I", 2, "value2"), + row("+I", 3, "value3") + ) + ); + + + FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) + .tableLoader(tableLoader) + .writeParallelism(parallelism) + .append(); + + // Execute the Flink job to write initial data + env.execute("Write Initial Data"); + + // Step 2: Apply equality deletes using Flink + + DataStream deleteStream = env.fromCollection( + ImmutableList.of( + row("-D", 1, "value1"), // Equality delete row with id=1 + row("-D", 2, "value2") // Equality delete row with id=2 + ) + ); + + FlinkSink.forRow(deleteStream, SimpleDataUtil.FLINK_SCHEMA) + .tableLoader(tableLoader) + .equalityFieldColumns(Lists.newArrayList("id", "data")) + .writeParallelism(parallelism) + .upsert(true) // Enable UPSERT mode for equality deletes + .append(); + + // Execute the Flink job to apply equality deletes + env.execute("Apply Equality Deletes"); + + // Step 3: Use Spark to read the table and verify that equality deletes were applied correctly + + // Initialize SparkSession + SparkSession spark = SparkSession.builder() + .appName("IcebergSparkRead") + .master("local[*]") + .config("spark.sql.catalog.hadoop_catalog", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.hadoop_catalog.type", "hadoop") + .config("spark.sql.catalog.myCatalog.warehouse", "file:///path/to/warehouse") + .getOrCreate(); + + // Read the table using Spark + Dataset result = spark.read() + .format("iceberg") + .load("hadoop_catalog." + DATABASE + "." + TestFixtures.TABLE); + + // Collect the result + List actualData = result.collectAsList(); + + // Expected result after applying equality deletes (only id=3 should remain) + List expectedData = ImmutableList.of(RowFactory.create(3, "value3")); + + // Assert that only row with id=3 remains in the table + assertThat(actualData).containsExactlyInAnyOrderElementsOf(expectedData); + + // Stop the Spark session + spark.stop(); + } +} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 56a68c384c5f..083c3c2b8f89 100644 --- a/settings.gradle +++ b/settings.gradle @@ -42,6 +42,7 @@ include 'dell' include 'snowflake' include 'delta-lake' include 'open-api' +include 'flink-spark-bundle' project(':bom').name = 'iceberg-bom' project(':api').name = 'iceberg-api' @@ -67,6 +68,7 @@ project(':dell').name = 'iceberg-dell' project(':snowflake').name = 'iceberg-snowflake' project(':delta-lake').name = 'iceberg-delta-lake' project(':open-api').name = 'iceberg-open-api' +project(':flink-spark-bundle').name = 'iceberg-flink-spark-bundle' if (null != System.getProperty("allModules")) { System.setProperty("flinkVersions", System.getProperty("knownFlinkVersions")) From 3f7f2aeec342b01faf9c27952cf2fe073dabf574 Mon Sep 17 00:00:00 2001 From: adelly13 Date: Thu, 24 Oct 2024 22:12:34 -0700 Subject: [PATCH 2/2] Added Upsert unit test and fix relocated indexing --- ... TestFlinkEqualityDeleteWriteOnSpark.java} | 249 +++++------------- .../actions/TestRewriteDataFilesAction.java | 69 +++++ 2 files changed, 133 insertions(+), 185 deletions(-) rename flink-spark-bundle/src/test/java/org/apache/iceberg/flinkspark/{TestFlinkSpark.java => TestFlinkEqualityDeleteWriteOnSpark.java} (50%) diff --git a/flink-spark-bundle/src/test/java/org/apache/iceberg/flinkspark/TestFlinkSpark.java b/flink-spark-bundle/src/test/java/org/apache/iceberg/flinkspark/TestFlinkEqualityDeleteWriteOnSpark.java similarity index 50% rename from flink-spark-bundle/src/test/java/org/apache/iceberg/flinkspark/TestFlinkSpark.java rename to flink-spark-bundle/src/test/java/org/apache/iceberg/flinkspark/TestFlinkEqualityDeleteWriteOnSpark.java index c69603e2ab4e..5234131248f2 100644 --- a/flink-spark-bundle/src/test/java/org/apache/iceberg/flinkspark/TestFlinkSpark.java +++ b/flink-spark-bundle/src/test/java/org/apache/iceberg/flinkspark/TestFlinkEqualityDeleteWriteOnSpark.java @@ -30,38 +30,30 @@ import org.apache.flink.types.Row; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.DistributionMode; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.HadoopCatalogExtension; import org.apache.iceberg.flink.MiniFlinkClusterExtension; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.sink.FlinkSink; -import org.apache.iceberg.flink.source.BoundedTestSource; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Types; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; -import org.apache.flink.types.Row; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.RowFactory; @ExtendWith(ParameterizedTestExtension.class) @Timeout(value = 60) -public class TestFlinkSpark extends TestFlinkIcebergSinkV2Base { +public class TestFlinkEqualityDeleteWriteOnSpark extends TestFlinkIcebergSinkV2Base { @RegisterExtension public static final MiniClusterExtension MINI_CLUSTER_EXTENSION = MiniFlinkClusterExtension.createWithClassloaderCheckDisabled(); @@ -104,182 +96,7 @@ public void setupTable() { } @TestTemplate - public void testCheckAndGetEqualityFieldIds() { - table - .updateSchema() - .allowIncompatibleChanges() - .addRequiredColumn("type", Types.StringType.get()) - .setIdentifierFields("type") - .commit(); - - DataStream dataStream = - env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO); - FlinkSink.Builder builder = - FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA).table(table); - - // Use schema identifier field IDs as equality field id list by default - assertThat(builder.checkAndGetEqualityFieldIds()) - .containsExactlyInAnyOrderElementsOf(table.schema().identifierFieldIds()); - - // Use user-provided equality field column as equality field id list - builder.equalityFieldColumns(Lists.newArrayList("id")); - assertThat(builder.checkAndGetEqualityFieldIds()) - .containsExactlyInAnyOrder(table.schema().findField("id").fieldId()); - - builder.equalityFieldColumns(Lists.newArrayList("type")); - assertThat(builder.checkAndGetEqualityFieldIds()) - .containsExactlyInAnyOrder(table.schema().findField("type").fieldId()); - } - - @TestTemplate - public void testChangeLogOnIdKey() throws Exception { - testChangeLogOnIdKey(SnapshotRef.MAIN_BRANCH); - } - - @TestTemplate - public void testUpsertOnlyDeletesOnDataKey() throws Exception { - List> elementsPerCheckpoint = - ImmutableList.of( - ImmutableList.of(row("+I", 1, "aaa")), - ImmutableList.of(row("-D", 1, "aaa"), row("-D", 2, "bbb"))); - - List> expectedRecords = - ImmutableList.of(ImmutableList.of(record(1, "aaa")), ImmutableList.of()); - - testChangeLogs( - ImmutableList.of("data"), - row -> row.getField(ROW_DATA_POS), - true, - elementsPerCheckpoint, - expectedRecords, - SnapshotRef.MAIN_BRANCH); - } - - @TestTemplate - public void testChangeLogOnDataKey() throws Exception { - testChangeLogOnDataKey(SnapshotRef.MAIN_BRANCH); - } - - @TestTemplate - public void testChangeLogOnIdDataKey() throws Exception { - testChangeLogOnIdDataKey(SnapshotRef.MAIN_BRANCH); - } - - @TestTemplate - public void testChangeLogOnSameKey() throws Exception { - testChangeLogOnSameKey(SnapshotRef.MAIN_BRANCH); - } - - @TestTemplate - public void testUpsertModeCheck() throws Exception { - DataStream dataStream = - env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO); - FlinkSink.Builder builder = - FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) - .tableLoader(tableLoader) - .tableSchema(SimpleDataUtil.FLINK_SCHEMA) - .writeParallelism(parallelism) - .upsert(true); - - assertThatThrownBy( - () -> - builder - .equalityFieldColumns(ImmutableList.of("id", "data")) - .overwrite(true) - .append()) - .isInstanceOf(IllegalStateException.class) - .hasMessage( - "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream."); - - if (writeDistributionMode.equals(DistributionMode.RANGE.modeName()) && !partitioned) { - // validation error thrown from distributeDataStream - assertThatThrownBy( - () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append()) - .isInstanceOf(IllegalStateException.class) - .hasMessage( - "Invalid write distribution mode: range. Need to define sort order or partition spec."); - } else { - // validation error thrown from appendWriter - assertThatThrownBy( - () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append()) - .isInstanceOf(IllegalStateException.class) - .hasMessage( - "Equality field columns shouldn't be empty when configuring to use UPSERT data stream."); - } - } - - @TestTemplate - public void testUpsertOnIdKey() throws Exception { - testUpsertOnIdKey(SnapshotRef.MAIN_BRANCH); - } - - @TestTemplate - public void testUpsertOnDataKey() throws Exception { - testUpsertOnDataKey(SnapshotRef.MAIN_BRANCH); - } - - @TestTemplate - public void testUpsertOnIdDataKey() throws Exception { - testUpsertOnIdDataKey(SnapshotRef.MAIN_BRANCH); - } - - @TestTemplate - public void testDeleteStats() throws Exception { - assumeThat(format).isNotEqualTo(FileFormat.AVRO); - - List> elementsPerCheckpoint = - ImmutableList.of( - // Checkpoint #1 - ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), row("+I", 1, "aaa"))); - - List> expectedRecords = ImmutableList.of(ImmutableList.of(record(1, "aaa"))); - - testChangeLogs( - ImmutableList.of("id", "data"), - row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)), - false, - elementsPerCheckpoint, - expectedRecords, - "main"); - - DeleteFile deleteFile = table.currentSnapshot().addedDeleteFiles(table.io()).iterator().next(); - String fromStat = - new String( - deleteFile.lowerBounds().get(MetadataColumns.DELETE_FILE_PATH.fieldId()).array()); - DataFile dataFile = table.currentSnapshot().addedDataFiles(table.io()).iterator().next(); - assumeThat(fromStat).isEqualTo(dataFile.path().toString()); - } - - @TestTemplate - public void testCheckAndGetEqualityTable() { - table - .updateSchema() - .allowIncompatibleChanges() - .addRequiredColumn("type", Types.StringType.get()) - .setIdentifierFields("type") - .commit(); - - DataStream dataStream = - env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO); - FlinkSink.Builder builder = - FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA).table(table); - - // Use schema identifier field IDs as equality field id list by default - assertThat(builder.checkAndGetEqualityFieldIds()) - .containsExactlyInAnyOrderElementsOf(table.schema().identifierFieldIds()); - - // Use user-provided equality field column as equality field id list - builder.equalityFieldColumns(Lists.newArrayList("id")); - assertThat(builder.checkAndGetEqualityFieldIds()) - .containsExactlyInAnyOrder(table.schema().findField("id").fieldId()); - - builder.equalityFieldColumns(Lists.newArrayList("type")); - assertThat(builder.checkAndGetEqualityFieldIds()) - .containsExactlyInAnyOrder(table.schema().findField("type").fieldId()); - } - - @TestTemplate - public void testEqualityDeleteWritesOnSpark() throws Exception { + public void testEqualityDeleteWithDataStream() throws Exception { // Step 1: Write initial data using Flink DataStream dataStream = env.fromCollection( ImmutableList.of( @@ -317,6 +134,13 @@ public void testEqualityDeleteWritesOnSpark() throws Exception { // Execute the Flink job to apply equality deletes env.execute("Apply Equality Deletes"); + DeleteFile deleteFile = table.currentSnapshot().addedDeleteFiles(table.io()).iterator().next(); + String fromStat = + new String( + deleteFile.lowerBounds().get(MetadataColumns.DELETE_FILE_PATH.fieldId()).array()); + DataFile dataFile = table.currentSnapshot().addedDataFiles(table.io()).iterator().next(); + assumeThat(fromStat).isEqualTo(dataFile.path().toString()); + // Step 3: Use Spark to read the table and verify that equality deletes were applied correctly // Initialize SparkSession @@ -345,4 +169,59 @@ public void testEqualityDeleteWritesOnSpark() throws Exception { // Stop the Spark session spark.stop(); } + + @TestTemplate + public void testEqualityDeleteWithUpsert() throws Exception { + // Step 1: Create FlinkTableEnvironment to handle SQL-based operations + TableEnvironment tEnv = TableEnvironment.create(env.getConfiguration()); + + // Step 2: Create a table with a primary key and UPSERT enabled + tEnv.executeSql("CREATE TABLE test_table (id INT, data STRING, PRIMARY KEY(id) NOT ENFORCED) " + + "WITH ('format-version'='2', 'write.upsert.enabled'='true', " + + "'connector'='filesystem', 'path'='file:///path/to/table', 'format'='parquet')"); + + // Step 3: Insert initial data into the table + tEnv.executeSql("INSERT INTO test_table VALUES (1, 'value1'), (2, 'value2'), (3, 'value3')"); + + // Step 4: Perform UPSERT operation to update existing and add new rows + tEnv.executeSql("INSERT INTO test_table VALUES (2, 'updated_value2'), (4, 'value4')"); + + + // Step 5: Use Spark to verify the results after the UPSERT + SparkSession spark = SparkSession.builder() + .appName("IcebergSparkRead") + .master("local[*]") + .config("spark.sql.catalog.hadoop_catalog", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.hadoop_catalog.type", "hadoop") + .config("spark.sql.catalog.myCatalog.warehouse", "file:///path/to/warehouse") + .getOrCreate(); + + // Step 6: Read the table using Spark + Dataset result = spark.read() + .format("iceberg") + .load("hadoop_catalog." + DATABASE + "." + TestFixtures.TABLE); + + // Step 7: Collect the result and verify the UPSERT operation + List actualData = result.collectAsList(); + List expectedData = ImmutableList.of( + RowFactory.create(1, "value1"), // Row 1 remains unchanged + RowFactory.create(2, "updated_value2"), // Row 2 is updated + RowFactory.create(3, "value3"), // Row 3 remains unchanged + RowFactory.create(4, "value4") // Row 4 is newly added + ); + + // Step 8: Assert that the UPSERT operation has the expected outcome + assertThat(actualData).containsExactlyInAnyOrderElementsOf(expectedData); + + // Stop the Spark session + spark.stop(); + } + + private List getFilesInTable() { + // Code to list the files in the table directory + File tableDirectory = new File("/path/to/table"); + return Arrays.stream(Objects.requireNonNull(tableDirectory.listFiles())) + .map(File::getName) + .collect(Collectors.toList()); + } } \ No newline at end of file diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index b67ee87c7d3e..3cf232c7877d 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -165,6 +165,75 @@ public void testEmptyTable() { assertThat(table.currentSnapshot()).as("Table must stay empty").isNull(); } + @Test + public void testLargeFileCompaction() { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map options = Maps.newHashMap(); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); // create empty table + int totalFiles = 0; + + for (int i = 0; i < 10000; i++) { + // Append records to the table in each iteration + writeRecords(10000, SCALE*3); // Appending large files in each iteration + totalFiles += 10000; + System.out.println(totalFiles*10000); + shouldHaveFiles(table, totalFiles); + } + + try { + basicRewrite(table).execute(); + + } catch (OutOfMemoryError e) { + // Catch the OutOfMemoryError and validate that it was triggered as expected + System.out.println("Caught OutOfMemoryError as expected: " + e.getMessage()); + } catch (Exception e) { + // If any other exception occurs, fail the test + System.out.println("Caught OutOfMemoryError as expected: " + e.getMessage()); + } + } + + public long getMemoryUsage() { + Runtime runtime = Runtime.getRuntime(); + runtime.gc(); // Suggest garbage collection before measuring + return runtime.totalMemory() - runtime.freeMemory(); + } + + @Test + public void testCompactionMemoryUsage() { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map options = Maps.newHashMap(); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); // create empty table + int totalFiles = 0; + + // Write records and perform compaction job + for (int i = 0; i < 10000; i++) { + // Append records to the table in each iteration + long memoryBefore = getMemoryUsage(); // Memory prior to compaction + writeRecords(10000, SCALE*3); // Appending large files in each iteration + totalFiles += 10000; + // Check if table has files + System.out.println(totalFiles); + // shouldHaveFiles(table, totalFiles); + try { + basicRewrite(table).execute(); // Try to execute with this many files + long memoryAfter = getMemoryUsage(); // Memory after compaction run + long memoryConsumed = memoryAfter - memoryBefore; // Amount of memory consumed for this operation + System.out.println("Memory used during compaction: " + memoryConsumed + " bytes"); + + } catch (OutOfMemoryError e) { + // Catch the OutOfMemoryError and print the most recent memory usage + System.out.println("Caught OutOfMemoryError as expected: " + e.getMessage()); + long memoryAfter = getMemoryUsage(); // Memory after compaction run + long memoryConsumed = memoryAfter - memoryBefore; // Amount of memory consumed for this operation + System.out.println("Memory used during compaction: " + memoryConsumed + " bytes"); + } catch (Exception e) { + // If any other exception occurs, print that the test failed + System.out.println("Caught: " + e.getMessage()); + } + } + } + + @Test public void testBinPackUnpartitionedTable() { Table table = createTable(4);