Skip to content

Commit 6dcf09b

Browse files
committed
[SPARK-27971][SQL][R] MapPartitionsInRWithArrowExec.evaluate shouldn't eagerly read the first batch
## What changes were proposed in this pull request? This PR is the same fix as apache#24816 but in vectorized `dapply` in SparkR. ## How was this patch tested? Manually tested. Closes apache#24818 from HyukjinKwon/SPARK-27971. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent e561e92 commit 6dcf09b

File tree

1 file changed

+5
-22
lines changed
  • sql/core/src/main/scala/org/apache/spark/sql/execution

1 file changed

+5
-22
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala

+5-22
Original file line numberDiff line numberDiff line change
@@ -243,28 +243,11 @@ case class MapPartitionsInRWithArrowExec(
243243
// binary in a batch due to the limitation of R API. See also ARROW-4512.
244244
val columnarBatchIter = runner.compute(batchIter, -1)
245245
val outputProject = UnsafeProjection.create(output, output)
246-
new Iterator[InternalRow] {
247-
248-
private var currentIter = if (columnarBatchIter.hasNext) {
249-
val batch = columnarBatchIter.next()
250-
val actualDataTypes = (0 until batch.numCols()).map(i => batch.column(i).dataType())
251-
assert(outputTypes == actualDataTypes, "Invalid schema from dapply(): " +
252-
s"expected ${outputTypes.mkString(", ")}, got ${actualDataTypes.mkString(", ")}")
253-
batch.rowIterator.asScala
254-
} else {
255-
Iterator.empty
256-
}
257-
258-
override def hasNext: Boolean = currentIter.hasNext || {
259-
if (columnarBatchIter.hasNext) {
260-
currentIter = columnarBatchIter.next().rowIterator.asScala
261-
hasNext
262-
} else {
263-
false
264-
}
265-
}
266-
267-
override def next(): InternalRow = currentIter.next()
246+
columnarBatchIter.flatMap { batch =>
247+
val actualDataTypes = (0 until batch.numCols()).map(i => batch.column(i).dataType())
248+
assert(outputTypes == actualDataTypes, "Invalid schema from dapply(): " +
249+
s"expected ${outputTypes.mkString(", ")}, got ${actualDataTypes.mkString(", ")}")
250+
batch.rowIterator.asScala
268251
}.map(outputProject)
269252
}
270253
}

0 commit comments

Comments
 (0)