Skip to content

Commit 808d9d0

Browse files
dbtsailindblombr
andcommitted
[SPARK-27762][SQL] Support user provided avro schema for writing fields with different ordering
## What changes were proposed in this pull request? Spark Avro reader supports reading avro files with provided schema with different field orderings. However, the avro writer doesn't support this feature. This PR enables the Spark avro writer with this feature. ## How was this patch tested? New test is added. Closes apache#24635 from dbtsai/avroFix. Lead-authored-by: DB Tsai <[email protected]> Co-authored-by: Brian Lindblom <[email protected]> Signed-off-by: DB Tsai <[email protected]>
1 parent 9e73be3 commit 808d9d0

File tree

2 files changed

+67
-10
lines changed

2 files changed

+67
-10
lines changed

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala

+16-6
Original file line numberDiff line numberDiff line change
@@ -205,18 +205,28 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:
205205
throw new IncompatibleSchemaException(s"Cannot convert Catalyst type $catalystStruct to " +
206206
s"Avro type $avroStruct.")
207207
}
208-
val fieldConverters = catalystStruct.zip(avroStruct.getFields.asScala).map {
209-
case (f1, f2) => newConverter(f1.dataType, resolveNullableType(f2.schema(), f1.nullable))
210-
}
208+
209+
val (avroIndices: Array[Int], fieldConverters: Array[Converter]) =
210+
catalystStruct.map { catalystField =>
211+
val avroField = avroStruct.getField(catalystField.name)
212+
if (avroField == null) {
213+
throw new IncompatibleSchemaException(
214+
s"Cannot convert Catalyst type $catalystStruct to Avro type $avroStruct.")
215+
}
216+
val converter = newConverter(catalystField.dataType, resolveNullableType(
217+
avroField.schema(), catalystField.nullable))
218+
(avroField.pos(), converter)
219+
}.toArray.unzip
220+
211221
val numFields = catalystStruct.length
212-
(row: InternalRow) =>
222+
row: InternalRow =>
213223
val result = new Record(avroStruct)
214224
var i = 0
215225
while (i < numFields) {
216226
if (row.isNullAt(i)) {
217-
result.put(i, null)
227+
result.put(avroIndices(i), null)
218228
} else {
219-
result.put(i, fieldConverters(i).apply(row, i))
229+
result.put(avroIndices(i), fieldConverters(i).apply(row, i))
220230
}
221231
i += 1
222232
}

external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala

+51-4
Original file line numberDiff line numberDiff line change
@@ -692,9 +692,9 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
692692
| "type" : "record",
693693
| "name" : "test_schema",
694694
| "fields" : [{
695-
| "name": "enum",
695+
| "name": "Suit",
696696
| "type": [{ "type": "enum",
697-
| "name": "Suit",
697+
| "name": "SuitEnumType",
698698
| "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
699699
| }, "null"]
700700
| }]
@@ -734,9 +734,9 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
734734
| "type" : "record",
735735
| "name" : "test_schema",
736736
| "fields" : [{
737-
| "name": "enum",
737+
| "name": "Suit",
738738
| "type": { "type": "enum",
739-
| "name": "Suit",
739+
| "name": "SuitEnumType",
740740
| "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
741741
| }
742742
| }]
@@ -883,6 +883,53 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
883883
}
884884
}
885885

886+
test("support user provided avro schema for writing / reading fields with different ordering") {
887+
withTempPath { tempDir =>
888+
val avroSchema =
889+
"""
890+
|{
891+
| "type" : "record",
892+
| "name" : "test_schema",
893+
| "fields" : [
894+
| {"name": "Age", "type": "int"},
895+
| {"name": "Name", "type": "string"}
896+
| ]
897+
|}
898+
""".stripMargin
899+
900+
val avroSchemaReversed =
901+
"""
902+
|{
903+
| "type" : "record",
904+
| "name" : "test_schema",
905+
| "fields" : [
906+
| {"name": "Name", "type": "string"},
907+
| {"name": "Age", "type": "int"}
908+
| ]
909+
|}
910+
""".stripMargin
911+
912+
val df = spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row(2, "Aurora"))),
913+
StructType(Seq(
914+
StructField("Age", IntegerType, false),
915+
StructField("Name", StringType, false))))
916+
917+
val tempSaveDir = s"$tempDir/save/"
918+
919+
// Writing avro file with reversed field ordering
920+
df.write.format("avro").option("avroSchema", avroSchemaReversed).save(tempSaveDir)
921+
922+
// Reading reversed avro file
923+
checkAnswer(df.select("Name", "Age"), spark.read.format("avro").load(tempSaveDir))
924+
checkAvroSchemaEquals(avroSchemaReversed, getAvroSchemaStringFromFiles(tempSaveDir))
925+
926+
// Reading reversed avro file with provided original schema
927+
val avroDf = spark.read.format("avro").option("avroSchema", avroSchema).load(tempSaveDir)
928+
checkAnswer(df, avroDf)
929+
assert(avroDf.schema.fieldNames.sameElements(Array("Age", "Name")))
930+
}
931+
}
932+
886933
test("error handling for unsupported Interval data types") {
887934
withTempDir { dir =>
888935
val tempDir = new File(dir, "files").getCanonicalPath

0 commit comments

Comments
 (0)