Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
grpcObserver: StreamObserver[T])
extends Logging { self =>

// the executionObserver object is used as a synchronization lock between the
// ExecuteGrpcResponseSender consumer and ExecuteResponseObserver producer.
// The executionObserver object is where the responses are consumed. A lock is
// used for synchronization between the ExecuteGrpcResponseSender consumer and
// ExecuteResponseObserver producer.
private val executionObserver = executeHolder.responseObserver
.asInstanceOf[ExecuteResponseObserver[T]]

Expand Down Expand Up @@ -85,12 +86,12 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
// In reattachable execution we use setOnReadyHandler and grpcCallObserver.isReady to control
// backpressure. See sendResponse.
//
// Because calls to OnReadyHandler get queued on the same GRPC inboud queue as the executePlan
// or reattachExecute RPC handler that this is executing in, they will not arrive and not
// trigger the OnReadyHandler unless this thread returns from executePlan/reattachExecute.
// Therefore, we launch another thread to operate on the grpcObserver and send the responses,
// while this thread will exit from the executePlan/reattachExecute call, allowing GRPC
// to send the OnReady events.
// Because calls to OnReadyHandler get queued on the same GRPC inbound queue as the
// executePlan or reattachExecute RPC handler that this is executing in, they will not arrive
// and not trigger the OnReadyHandler unless this thread returns from
// executePlan/reattachExecute. Therefore, we launch another thread to operate on the
// grpcObserver and send the responses, while this thread will exit from the
// executePlan/reattachExecute call, allowing GRPC to send the OnReady events.
// See https://github.com/grpc/grpc-java/issues/7361

backgroundThread = Some(
Expand All @@ -101,7 +102,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
try {
execute(lastConsumedStreamIndex)
} catch {
// This is executing in it's own thread, so need to handle RPC error like the
// This is executing in its own thread, so need to handle RPC error like the
// SparkConnectService handlers do.
ErrorUtils.handleError(
"async-grpc-response-sender",
Expand All @@ -125,12 +126,12 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
// Start the thread and exit
backgroundThread.foreach(_.start())
} else {
// Non reattachable execute runs directly in the GRPC thread.
// Non-reattachable execute runs directly in the GRPC thread.
try {
execute(lastConsumedStreamIndex)
} finally {
executeHolder.removeGrpcResponseSender(this)
// Non reattachable executions release here immediately.
// Non-reattachable executions release here immediately.
// (Reattachable executions release with ReleaseExecute RPC.)
SparkConnectService.executionManager.removeExecuteHolder(executeHolder.key)
}
Expand All @@ -139,7 +140,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](

/**
* This method is called repeatedly during the query execution to enqueue a new message to be
* send to the client about the current query progress. The message is not directly send to the
* sent to the client about the current query progress. The message is not directly sent to the
* client, but rather enqueued to in the response observer.
*/
private def enqueueProgressMessage(force: Boolean = false): Unit = {
Expand All @@ -161,8 +162,8 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
.addAllStages(stages.map(_.toProto()).asJava)
.setNumInflightTasks(inflightTasks))
.build()
// There is a special case when the response observer has alreaady determined
// that the final message is send (and the stream will be closed) but we might want
// There is a special case when the response observer has already determined
// that the final message is sent (and the stream will be closed) but we might want
// to send the progress message. In this case we ignore the result of the `onNext`
// call.
executeHolder.responseObserver.tryOnNext(response)
Expand Down Expand Up @@ -193,7 +194,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
/**
* Attach to the executionObserver, consume responses from it, and send them to grpcObserver.
*
* In non reattachable execution, it will keep sending responses until the query finishes. In
* In non-reattachable execution, it will keep sending responses until the query finishes. In
* reattachable execution, it can finish earlier after reaching a time deadline or size limit.
*
* After this function finishes, the grpcObserver is closed with either onCompleted or onError.
Expand Down Expand Up @@ -303,7 +304,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
assert(finished == false)
} else {
// If it wasn't sent, time deadline must have been reached before stream became available,
// or it was interrupted. Will exit in the next loop iterattion.
// or it was interrupted. Will exit in the next loop iteration.
assert(deadlineLimitReached || interrupted)
}
} else if (streamFinished) {
Expand Down