diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index b67ee87c7d3e..5997a2e0096d 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -164,6 +164,143 @@ public void testEmptyTable() { assertThat(table.currentSnapshot()).as("Table must stay empty").isNull(); } + @Test + public void testCompactionMemoryUsage() { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map options = Maps.newHashMap(); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); // create empty table + int totalFiles = 0; + + // Write records and perform compaction job + for (int i = 0; i < 10000; i++) { + // Append records to the table in each iteration + long memoryBefore = getMemoryUsage(); // Memory prior to compaction + writeRecords(10000, SCALE*3); // Appending large files in each iteration + totalFiles += 10000; + // Check if table has files + System.out.println(totalFiles); + // shouldHaveFiles(table, totalFiles); + try { + basicRewrite(table).execute(); // Try to execute with this many files + long memoryAfter = getMemoryUsage(); // Memory after compaction run + long memoryConsumed = memoryAfter - memoryBefore; // Amount of memory consumed for this operation + System.out.println("Memory used during compaction: " + memoryConsumed + " bytes"); + + } catch (OutOfMemoryError e) { + // Catch the OutOfMemoryError and print the most recent memory usage + System.out.println("Caught OutOfMemoryError as expected: " + e.getMessage()); + long memoryAfter = getMemoryUsage(); // Memory after compaction run + long memoryConsumed = memoryAfter - memoryBefore; // Amount of memory consumed for this operation + System.out.println("Memory used during compaction: " + memoryConsumed + " bytes"); + } catch (Exception e) { + // If any other exception occurs, print that the test failed + System.out.println("Caught: " + e.getMessage()); + } + } + } + + // Read and Compact Helper Method + private RewritePositionDeleteFilesSparkAction basicRewritePositionalDeletes (Table curTable) { + curTable.refresh(); // updates table snapshot to newest "version" + return actions().rewritePositionDeletes(curTable); + } + + @Test + public void testCompactionSdataLdelete() { + PartitionSpec spec = PartitionSpec.unpartitioned(); // determines how we are partitioning + Map options = Maps.newHashMap(); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); // create empty table + + // Create a large amount of records + List records = new Lists.newArrayList(); + GenericRecord record = GenericRecord.create(table.schema()); + + for (int i = 1; i <= 100_000; i++) { + records.add(record.copy("id", i, "data", "data_" + i)); + } + + // Create a dataFile with all the records + this.dataFile = FileHelpers.writeDataFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + Row.of(0), + records); + table.newAppend().appendFile(dataFile).commit(); // saving a snapshot of the table + + // Create a lot of deletes + PositionDelete dataDelete = PositionDelete.create(); + List dataDeletes = new Lists.newArrayList(); + for (int i = 1; i <= 50_000; i++) { + dataDeletes.add(dataDelete.copy("data", "data_" + i)); + } + + // Write several delete files + RowDelta r = table.newRowDelta(); + for (int i = 1; i <= 50_000; i++) { + // Create a huge equality delete with all the deletes + DeleteFile eqDeletes = FileHelpers.writeDeleteFile( table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + ImmutableList.of(dataDeletes), + deleteRowSchema); + r.addDeletes(eqDeletes); + } + r.commit(); + + //Compaction Job + basicRewritePositionalDeltes(table).execute(); + } + + @Test + public void testCompactionLdataLdeletes() { + PartitionSpec spec = PartitionSpec.unpartitioned(); // determines how we are partitioning + Map options = Maps.newHashMap(); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); // create empty table + + // Create a large amount of records + List records = new Lists.newArrayList(); + GenericRecord record = GenericRecord.create(table.schema()); + + for (int i = 1; i <= 100_000; i++) { + records.add(record.copy("id", i, "data", "data_" + i)); + } + + // Create several dataFiles + AppendFiles recAppend = table.newAppend(); + for (int i = 1; i <= 100_000; i++) { + // Create a dataFile with all the records + this.dataFile = FileHelpers.writeDataFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + Row.of(0), + ImmutableList.of(records)); + recAppennd.appendFile(dataFile); + } + recAppend.commit(); // saving a snapshot of the table + + // Create a lot of deletes + + List dataDeletes = new Lists.newArrayList(); + for (int i = 1; i <= 50_000; i++) { + dataDeletes.add(dataDelete.copy("data", "data_" + i)); + } + + // Write several delete files + RowDelta r = table.newRowDelta(); + for (int i = 1; i <= 50_000; i++) { + // Create a huge equality delete with all the deletes + DeleteFile eqDeletes = FileHelpers.writeDeleteFile( table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + ImmutableList.of(dataDeletes), + deleteRowSchema); + r.addDeletes(eqDeletes); + } + r.commit(); + + // Compaction job + basicRewritePositionalDeltes(table).execute(); + } @Test public void testBinPackUnpartitionedTable() {