Skip to content

Commit 4e61de4

Browse files
wangyumdongjoon-hyun
authored andcommitted
[SPARK-27863][SQL] Metadata files and temporary files should not be counted as data files
## What changes were proposed in this pull request? [`DataSourceUtils.isDataPath(path)`](https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala#L95) should be `DataSourceUtils.isDataPath(status.getPath)`. This pr fix this issue. ## How was this patch tested? unit tests Closes apache#24725 from wangyum/SPARK-27863. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 1824cbf commit 4e61de4

File tree

4 files changed

+48
-7
lines changed

4 files changed

+48
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala

+6-5
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,7 @@ object CommandUtils extends Logging {
6464
val paths = partitions.map(x => new Path(x.storage.locationUri.get))
6565
val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
6666
val pathFilter = new PathFilter with Serializable {
67-
override def accept(path: Path): Boolean = {
68-
DataSourceUtils.isDataPath(path) && !path.getName.startsWith(stagingDir)
69-
}
67+
override def accept(path: Path): Boolean = isDataPath(path, stagingDir)
7068
}
7169
val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(
7270
paths, sessionState.newHadoopConf(), pathFilter, spark)
@@ -98,8 +96,7 @@ object CommandUtils extends Logging {
9896
val size = if (fileStatus.isDirectory) {
9997
fs.listStatus(path)
10098
.map { status =>
101-
if (!status.getPath.getName.startsWith(stagingDir) &&
102-
DataSourceUtils.isDataPath(path)) {
99+
if (isDataPath(status.getPath, stagingDir)) {
103100
getPathSize(fs, status.getPath)
104101
} else {
105102
0L
@@ -343,4 +340,8 @@ object CommandUtils extends Logging {
343340
cs.copy(histogram = Some(histogram))
344341
}
345342
}
343+
344+
private def isDataPath(path: Path, stagingDir: String): Boolean = {
345+
!path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path)
346+
}
346347
}

sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala

+37-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
package org.apache.spark.sql
1919

20-
import java.io.File
20+
import java.io.{File, PrintWriter}
21+
import java.net.URI
2122
import java.util.TimeZone
2223
import java.util.concurrent.TimeUnit
2324

@@ -614,4 +615,39 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
614615
}
615616
}
616617
}
618+
619+
test("Metadata files and temporary files should not be counted as data files") {
620+
withTempDir { tempDir =>
621+
val tableName = "t1"
622+
val stagingDirName = ".test-staging-dir"
623+
val tableLocation = s"${tempDir.toURI}/$tableName"
624+
withSQLConf(
625+
SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> "true",
626+
"hive.exec.stagingdir" -> stagingDirName) {
627+
withTable("t1") {
628+
sql(s"CREATE TABLE $tableName(c1 BIGINT) USING PARQUET LOCATION '$tableLocation'")
629+
sql(s"INSERT INTO TABLE $tableName VALUES(1)")
630+
631+
val staging = new File(new URI(s"$tableLocation/$stagingDirName"))
632+
Utils.tryWithResource(new PrintWriter(staging)) { stagingWriter =>
633+
stagingWriter.write("12")
634+
}
635+
636+
val metadata = new File(new URI(s"$tableLocation/_metadata"))
637+
Utils.tryWithResource(new PrintWriter(metadata)) { metadataWriter =>
638+
metadataWriter.write("1234")
639+
}
640+
641+
sql(s"INSERT INTO TABLE $tableName VALUES(1)")
642+
643+
val stagingFileSize = staging.length()
644+
val metadataFileSize = metadata.length()
645+
val tableLocationSize = getDataSize(new File(new URI(tableLocation)))
646+
647+
val stats = checkTableStats(tableName, hasSizeInBytes = true, expectedRowCounts = None)
648+
assert(stats.get.sizeInBytes === tableLocationSize - stagingFileSize - metadataFileSize)
649+
}
650+
}
651+
}
652+
}
617653
}

sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala

+4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql
1919

2020
import java.{lang => jl}
21+
import java.io.File
2122
import java.sql.{Date, Timestamp}
2223
import java.util.concurrent.TimeUnit
2324

@@ -294,6 +295,9 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
294295
}
295296
}
296297

298+
def getDataSize(file: File): Long =
299+
file.listFiles.filter(!_.getName.endsWith(".crc")).map(_.length).sum
300+
297301
// This test will be run twice: with and without Hive support
298302
test("SPARK-18856: non-empty partitioned table should not report zero size") {
299303
withTable("ds_tbl", "hive_tbl") {

sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
120120
withTempDir { tempDir =>
121121
withTable("t1") {
122122
spark.range(5).write.mode(SaveMode.Overwrite).parquet(tempDir.getCanonicalPath)
123-
val dataSize = tempDir.listFiles.filter(!_.getName.endsWith(".crc")).map(_.length).sum
123+
val dataSize = getDataSize(tempDir)
124124
spark.sql(
125125
s"""
126126
|CREATE EXTERNAL TABLE t1(id BIGINT)

0 commit comments

Comments
 (0)