Skip to content

Commit 324c534

Browse files
committed
#415 Add data source writer skeleton.
1 parent 77dc2fd commit 324c534

File tree

2 files changed

+79
-2
lines changed

2 files changed

+79
-2
lines changed

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
package za.co.absa.cobrix.spark.cobol.source
1818

1919
import org.apache.hadoop.fs.Path
20-
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider, SchemaRelationProvider}
20+
import org.apache.spark.sql.sources._
2121
import org.apache.spark.sql.types.StructType
22-
import org.apache.spark.sql.{SQLContext, SparkSession}
22+
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
2323
import za.co.absa.cobrix.cobol.internal.Logging
2424
import za.co.absa.cobrix.cobol.reader.parameters.CobolParameters
2525
import za.co.absa.cobrix.spark.cobol.parameters.CobolParametersParser._
@@ -35,6 +35,7 @@ import za.co.absa.cobrix.spark.cobol.utils.{BuildProperties, SparkUtils}
3535
class DefaultSource
3636
extends RelationProvider
3737
with SchemaRelationProvider
38+
with CreatableRelationProvider
3839
with DataSourceRegister
3940
with ReaderFactory
4041
with Logging {
@@ -45,6 +46,7 @@ class DefaultSource
4546
createRelation(sqlContext, parameters, null)
4647
}
4748

49+
/** Reader relation */
4850
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = {
4951
CobolParametersValidator.validateOrThrow(parameters, sqlContext.sparkSession.sparkContext.hadoopConfiguration)
5052

@@ -59,6 +61,36 @@ class DefaultSource
5961
cobolParameters.debugIgnoreFileSize)(sqlContext)
6062
}
6163

64+
/** Writer relation */
65+
override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {
66+
val path = parameters.getOrElse("path",
67+
throw new IllegalArgumentException("Path is required for this data source."))
68+
69+
mode match {
70+
case SaveMode.Overwrite =>
71+
val outputPath = new Path(path)
72+
val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
73+
val fs = outputPath.getFileSystem(hadoopConf)
74+
if (fs.exists(outputPath)) {
75+
fs.delete(outputPath, true)
76+
}
77+
case SaveMode.Append =>
78+
case _ =>
79+
}
80+
81+
// Simply save each row as comma-separated values in a text file
82+
data.rdd
83+
.map(row => row.mkString(","))
84+
.saveAsTextFile(path)
85+
86+
new BaseRelation {
87+
override def sqlContext: SQLContext = sqlContext
88+
89+
override def schema: StructType = data.schema
90+
}
91+
}
92+
93+
6294
//TODO fix with the correct implementation once the correct Reader hierarchy is put in place.
6395
override def buildReader(spark: SparkSession, parameters: Map[String, String]): FixedLenReader = null
6496

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.spark.cobol.source
18+
19+
import org.apache.hadoop.fs.Path
20+
import org.scalatest.wordspec.AnyWordSpec
21+
import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
22+
import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture
23+
import za.co.absa.cobrix.spark.cobol.utils.FileUtils
24+
25+
class WriterSourceSpec extends AnyWordSpec with SparkTestBase with BinaryFileFixture {
26+
27+
import spark.implicits._
28+
29+
"writer" should {
30+
"be able to write a basic dataframe" in {
31+
withTempDirectory("writer") { tempDir =>
32+
val df = List(("A", 1), ("B", 2), ("C", 3)).toDF("a", "b")
33+
34+
val outputPath = new Path(tempDir, "test")
35+
36+
df.write.format("cobol").save(outputPath.toString)
37+
38+
val files = FileUtils.getFiles(outputPath.toString, spark.sparkContext.hadoopConfiguration)
39+
40+
assert(files.nonEmpty)
41+
}
42+
}
43+
}
44+
45+
}

0 commit comments

Comments
 (0)