Skip to content

Commit 3663dbe

Browse files
gengliangwangdongjoon-hyun
authored andcommitted
[SPARK-28218][SQL] Migrate Avro to File Data Source V2
## What changes were proposed in this pull request? Migrate Avro to File source V2. ## How was this patch tested? Unit test Closes apache#25017 from gengliangwang/avroV2. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 88cd6dc commit 3663dbe

12 files changed

+561
-124
lines changed
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
org.apache.spark.sql.avro.AvroFileFormat
1+
org.apache.spark.sql.v2.avro.AvroDataSourceV2

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala

+7-116
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,23 @@ import java.net.URI
2323
import scala.util.control.NonFatal
2424

2525
import org.apache.avro.Schema
26-
import org.apache.avro.file.DataFileConstants._
2726
import org.apache.avro.file.DataFileReader
2827
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
29-
import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
30-
import org.apache.avro.mapreduce.AvroJob
28+
import org.apache.avro.mapred.FsInput
3129
import org.apache.hadoop.conf.Configuration
3230
import org.apache.hadoop.fs.{FileStatus, Path}
3331
import org.apache.hadoop.mapreduce.Job
3432

35-
import org.apache.spark.{SparkException, TaskContext}
33+
import org.apache.spark.TaskContext
3634
import org.apache.spark.internal.Logging
3735
import org.apache.spark.sql.SparkSession
3836
import org.apache.spark.sql.catalyst.InternalRow
3937
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
4038
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
4139
import org.apache.spark.sql.types._
42-
import org.apache.spark.util.{SerializableConfiguration, Utils}
40+
import org.apache.spark.util.SerializableConfiguration
4341

44-
private[avro] class AvroFileFormat extends FileFormat
42+
private[sql] class AvroFileFormat extends FileFormat
4543
with DataSourceRegister with Logging with Serializable {
4644

4745
override def equals(other: Any): Boolean = other match {
@@ -56,74 +54,7 @@ private[avro] class AvroFileFormat extends FileFormat
5654
spark: SparkSession,
5755
options: Map[String, String],
5856
files: Seq[FileStatus]): Option[StructType] = {
59-
val conf = spark.sessionState.newHadoopConf()
60-
if (options.contains("ignoreExtension")) {
61-
logWarning(s"Option ${AvroOptions.ignoreExtensionKey} is deprecated. Please use the " +
62-
"general data source option pathGlobFilter for filtering file names.")
63-
}
64-
val parsedOptions = new AvroOptions(options, conf)
65-
66-
// User can specify an optional avro json schema.
67-
val avroSchema = parsedOptions.schema
68-
.map(new Schema.Parser().parse)
69-
.getOrElse {
70-
inferAvroSchemaFromFiles(files, conf, parsedOptions.ignoreExtension,
71-
spark.sessionState.conf.ignoreCorruptFiles)
72-
}
73-
74-
SchemaConverters.toSqlType(avroSchema).dataType match {
75-
case t: StructType => Some(t)
76-
case _ => throw new RuntimeException(
77-
s"""Avro schema cannot be converted to a Spark SQL StructType:
78-
|
79-
|${avroSchema.toString(true)}
80-
|""".stripMargin)
81-
}
82-
}
83-
84-
private def inferAvroSchemaFromFiles(
85-
files: Seq[FileStatus],
86-
conf: Configuration,
87-
ignoreExtension: Boolean,
88-
ignoreCorruptFiles: Boolean): Schema = {
89-
// Schema evolution is not supported yet. Here we only pick first random readable sample file to
90-
// figure out the schema of the whole dataset.
91-
val avroReader = files.iterator.map { f =>
92-
val path = f.getPath
93-
if (!ignoreExtension && !path.getName.endsWith(".avro")) {
94-
None
95-
} else {
96-
Utils.tryWithResource {
97-
new FsInput(path, conf)
98-
} { in =>
99-
try {
100-
Some(DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]()))
101-
} catch {
102-
case e: IOException =>
103-
if (ignoreCorruptFiles) {
104-
logWarning(s"Skipped the footer in the corrupted file: $path", e)
105-
None
106-
} else {
107-
throw new SparkException(s"Could not read file: $path", e)
108-
}
109-
}
110-
}
111-
}
112-
}.collectFirst {
113-
case Some(reader) => reader
114-
}
115-
116-
avroReader match {
117-
case Some(reader) =>
118-
try {
119-
reader.getSchema
120-
} finally {
121-
reader.close()
122-
}
123-
case None =>
124-
throw new FileNotFoundException(
125-
"No Avro files found. If files don't have .avro extension, set ignoreExtension to true")
126-
}
57+
AvroUtils.inferSchema(spark, options, files)
12758
}
12859

12960
override def shortName(): String = "avro"
@@ -140,32 +71,7 @@ private[avro] class AvroFileFormat extends FileFormat
14071
job: Job,
14172
options: Map[String, String],
14273
dataSchema: StructType): OutputWriterFactory = {
143-
val parsedOptions = new AvroOptions(options, spark.sessionState.newHadoopConf())
144-
val outputAvroSchema: Schema = parsedOptions.schema
145-
.map(new Schema.Parser().parse)
146-
.getOrElse(SchemaConverters.toAvroType(dataSchema, nullable = false,
147-
parsedOptions.recordName, parsedOptions.recordNamespace))
148-
149-
AvroJob.setOutputKeySchema(job, outputAvroSchema)
150-
151-
if (parsedOptions.compression == "uncompressed") {
152-
job.getConfiguration.setBoolean("mapred.output.compress", false)
153-
} else {
154-
job.getConfiguration.setBoolean("mapred.output.compress", true)
155-
logInfo(s"Compressing Avro output using the ${parsedOptions.compression} codec")
156-
val codec = parsedOptions.compression match {
157-
case DEFLATE_CODEC =>
158-
val deflateLevel = spark.sessionState.conf.avroDeflateLevel
159-
logInfo(s"Avro compression level $deflateLevel will be used for $DEFLATE_CODEC codec.")
160-
job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)
161-
DEFLATE_CODEC
162-
case codec @ (SNAPPY_CODEC | BZIP2_CODEC | XZ_CODEC) => codec
163-
case unknown => throw new IllegalArgumentException(s"Invalid compression codec: $unknown")
164-
}
165-
job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, codec)
166-
}
167-
168-
new AvroOutputWriterFactory(dataSchema, outputAvroSchema.toString)
74+
AvroUtils.prepareWrite(spark.sessionState.conf, job, options, dataSchema)
16975
}
17076

17177
override def buildReader(
@@ -250,22 +156,7 @@ private[avro] class AvroFileFormat extends FileFormat
250156
}
251157
}
252158

253-
override def supportDataType(dataType: DataType): Boolean = dataType match {
254-
case _: AtomicType => true
255-
256-
case st: StructType => st.forall { f => supportDataType(f.dataType) }
257-
258-
case ArrayType(elementType, _) => supportDataType(elementType)
259-
260-
case MapType(keyType, valueType, _) =>
261-
supportDataType(keyType) && supportDataType(valueType)
262-
263-
case udt: UserDefinedType[_] => supportDataType(udt.sqlType)
264-
265-
case _: NullType => true
266-
267-
case _ => false
268-
}
159+
override def supportDataType(dataType: DataType): Boolean = AvroUtils.supportsDataType(dataType)
269160
}
270161

271162
private[avro] object AvroFileFormat {

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType
2828
* @param catalystSchema Catalyst schema of input data.
2929
* @param avroSchemaAsJsonString Avro schema of output result, in JSON string format.
3030
*/
31-
private[avro] class AvroOutputWriterFactory(
31+
private[sql] class AvroOutputWriterFactory(
3232
catalystSchema: StructType,
3333
avroSchemaAsJsonString: String) extends OutputWriterFactory {
3434

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.avro
18+
19+
import java.io.{FileNotFoundException, IOException}
20+
21+
import org.apache.avro.Schema
22+
import org.apache.avro.file.DataFileConstants.{BZIP2_CODEC, DEFLATE_CODEC, SNAPPY_CODEC, XZ_CODEC}
23+
import org.apache.avro.file.DataFileReader
24+
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
25+
import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
26+
import org.apache.avro.mapreduce.AvroJob
27+
import org.apache.hadoop.conf.Configuration
28+
import org.apache.hadoop.fs.FileStatus
29+
import org.apache.hadoop.mapreduce.Job
30+
31+
import org.apache.spark.SparkException
32+
import org.apache.spark.internal.Logging
33+
import org.apache.spark.sql.SparkSession
34+
import org.apache.spark.sql.execution.datasources.OutputWriterFactory
35+
import org.apache.spark.sql.internal.SQLConf
36+
import org.apache.spark.sql.types._
37+
import org.apache.spark.util.Utils
38+
39+
object AvroUtils extends Logging {
40+
def inferSchema(
41+
spark: SparkSession,
42+
options: Map[String, String],
43+
files: Seq[FileStatus]): Option[StructType] = {
44+
val conf = spark.sessionState.newHadoopConf()
45+
if (options.contains("ignoreExtension")) {
46+
logWarning(s"Option ${AvroOptions.ignoreExtensionKey} is deprecated. Please use the " +
47+
"general data source option pathGlobFilter for filtering file names.")
48+
}
49+
val parsedOptions = new AvroOptions(options, conf)
50+
51+
// User can specify an optional avro json schema.
52+
val avroSchema = parsedOptions.schema
53+
.map(new Schema.Parser().parse)
54+
.getOrElse {
55+
inferAvroSchemaFromFiles(files, conf, parsedOptions.ignoreExtension,
56+
spark.sessionState.conf.ignoreCorruptFiles)
57+
}
58+
59+
SchemaConverters.toSqlType(avroSchema).dataType match {
60+
case t: StructType => Some(t)
61+
case _ => throw new RuntimeException(
62+
s"""Avro schema cannot be converted to a Spark SQL StructType:
63+
|
64+
|${avroSchema.toString(true)}
65+
|""".stripMargin)
66+
}
67+
}
68+
69+
def supportsDataType(dataType: DataType): Boolean = dataType match {
70+
case _: AtomicType => true
71+
72+
case st: StructType => st.forall { f => supportsDataType(f.dataType) }
73+
74+
case ArrayType(elementType, _) => supportsDataType(elementType)
75+
76+
case MapType(keyType, valueType, _) =>
77+
supportsDataType(keyType) && supportsDataType(valueType)
78+
79+
case udt: UserDefinedType[_] => supportsDataType(udt.sqlType)
80+
81+
case _: NullType => true
82+
83+
case _ => false
84+
}
85+
86+
def prepareWrite(
87+
sqlConf: SQLConf,
88+
job: Job,
89+
options: Map[String, String],
90+
dataSchema: StructType): OutputWriterFactory = {
91+
val parsedOptions = new AvroOptions(options, job.getConfiguration)
92+
val outputAvroSchema: Schema = parsedOptions.schema
93+
.map(new Schema.Parser().parse)
94+
.getOrElse(SchemaConverters.toAvroType(dataSchema, nullable = false,
95+
parsedOptions.recordName, parsedOptions.recordNamespace))
96+
97+
AvroJob.setOutputKeySchema(job, outputAvroSchema)
98+
99+
if (parsedOptions.compression == "uncompressed") {
100+
job.getConfiguration.setBoolean("mapred.output.compress", false)
101+
} else {
102+
job.getConfiguration.setBoolean("mapred.output.compress", true)
103+
logInfo(s"Compressing Avro output using the ${parsedOptions.compression} codec")
104+
val codec = parsedOptions.compression match {
105+
case DEFLATE_CODEC =>
106+
val deflateLevel = sqlConf.avroDeflateLevel
107+
logInfo(s"Avro compression level $deflateLevel will be used for $DEFLATE_CODEC codec.")
108+
job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)
109+
DEFLATE_CODEC
110+
case codec @ (SNAPPY_CODEC | BZIP2_CODEC | XZ_CODEC) => codec
111+
case unknown => throw new IllegalArgumentException(s"Invalid compression codec: $unknown")
112+
}
113+
job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, codec)
114+
}
115+
116+
new AvroOutputWriterFactory(dataSchema, outputAvroSchema.toString)
117+
}
118+
119+
private def inferAvroSchemaFromFiles(
120+
files: Seq[FileStatus],
121+
conf: Configuration,
122+
ignoreExtension: Boolean,
123+
ignoreCorruptFiles: Boolean): Schema = {
124+
// Schema evolution is not supported yet. Here we only pick first random readable sample file to
125+
// figure out the schema of the whole dataset.
126+
val avroReader = files.iterator.map { f =>
127+
val path = f.getPath
128+
if (!ignoreExtension && !path.getName.endsWith(".avro")) {
129+
None
130+
} else {
131+
Utils.tryWithResource {
132+
new FsInput(path, conf)
133+
} { in =>
134+
try {
135+
Some(DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]()))
136+
} catch {
137+
case e: IOException =>
138+
if (ignoreCorruptFiles) {
139+
logWarning(s"Skipped the footer in the corrupted file: $path", e)
140+
None
141+
} else {
142+
throw new SparkException(s"Could not read file: $path", e)
143+
}
144+
}
145+
}
146+
}
147+
}.collectFirst {
148+
case Some(reader) => reader
149+
}
150+
151+
avroReader match {
152+
case Some(reader) =>
153+
try {
154+
reader.getSchema
155+
} finally {
156+
reader.close()
157+
}
158+
case None =>
159+
throw new FileNotFoundException(
160+
"No Avro files found. If files don't have .avro extension, set ignoreExtension to true")
161+
}
162+
}
163+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.v2.avro
18+
19+
import org.apache.spark.sql.avro.AvroFileFormat
20+
import org.apache.spark.sql.execution.datasources.FileFormat
21+
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
22+
import org.apache.spark.sql.sources.v2.Table
23+
import org.apache.spark.sql.types.StructType
24+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
25+
26+
class AvroDataSourceV2 extends FileDataSourceV2 {
27+
28+
override def fallbackFileFormat: Class[_ <: FileFormat] = classOf[AvroFileFormat]
29+
30+
override def shortName(): String = "avro"
31+
32+
override def getTable(options: CaseInsensitiveStringMap): Table = {
33+
val paths = getPaths(options)
34+
val tableName = getTableName(paths)
35+
AvroTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
36+
}
37+
38+
override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
39+
val paths = getPaths(options)
40+
val tableName = getTableName(paths)
41+
AvroTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
42+
}
43+
}

0 commit comments

Comments
 (0)