Skip to content

Commit

Permalink
Add option to disable measuring buffer copy (#11996)
Browse files Browse the repository at this point in the history
<!--

Thank you for contributing to RAPIDS Accelerator for Apache Spark!

Here are some guidelines to help the review process go smoothly.

1. Please write a description in this text box of the changes that are
being
   made.

2. Please ensure that you have written units tests for the changes
made/features
   added.

3. If you are closing an issue please use one of the automatic closing
words as
noted here:
https://help.github.com/articles/closing-issues-using-keywords/

4. If your pull request is not ready for review but you want to make use
of the
continuous integration testing facilities please label it with `[WIP]`.

5. If your pull request is ready to be reviewed without requiring
additional
   work on top of it, then remove the `[WIP]` label (if present).

6. Once all work has been done and review has taken place please do not
add
features or make changes out of the scope of those requested by the
reviewer
(doing this just add delays as already reviewed code ends up having to
be
re-reviewed/it is hard to tell what is new etc!). Further, please avoid
rebasing your branch during the review process, as this causes the
context
of any comments made by reviewers to be lost. If conflicts occur during
review then they should be resolved by merging into the branch used for
   making the pull request.

Many thanks in advance for your cooperation!

-->

This is the first step of #11995 .

It adds an option to disable measuring copy buffer time in spark-rapids.
It doesn't actually disable it for kudo serializer, but could hide
metrics.

---------

Signed-off-by: Ray Liu <[email protected]>
  • Loading branch information
liurenjie1024 authored Feb 6, 2025
1 parent d5da8eb commit b49aee9
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -130,7 +130,7 @@ class SerializedBatchIterator(dIn: DataInputStream, deserTime: GpuMetric)
* @note The RAPIDS shuffle does not use this code.
*/
class GpuColumnarBatchSerializer(metrics: Map[String, GpuMetric], dataTypes: Array[DataType],
useKudo: Boolean)
useKudo: Boolean, kudoMeasureBufferCopy: Boolean)
extends Serializer with Serializable {

private lazy val kudo = {
Expand All @@ -143,7 +143,7 @@ class GpuColumnarBatchSerializer(metrics: Map[String, GpuMetric], dataTypes: Arr

override def newInstance(): SerializerInstance = {
if (useKudo) {
new KudoSerializerInstance(metrics, dataTypes, kudo)
new KudoSerializerInstance(metrics, dataTypes, kudo, kudoMeasureBufferCopy)
} else {
new GpuColumnarBatchSerializerInstance(metrics)
}
Expand Down Expand Up @@ -348,7 +348,8 @@ object SerializedTableColumn {
private class KudoSerializerInstance(
val metrics: Map[String, GpuMetric],
val dataTypes: Array[DataType],
val kudo: Option[KudoSerializer]
val kudo: Option[KudoSerializer],
val measureBufferCopyTime: Boolean,
) extends SerializerInstance {
private val dataSize = metrics(METRIC_DATA_SIZE)
private val serTime = metrics(METRIC_SHUFFLE_SER_STREAM_TIME)
Expand Down Expand Up @@ -399,8 +400,10 @@ private class KudoSerializerInstance(

dataSize += writeMetric.getWrittenBytes
serCalcHeaderTime += writeMetric.getCalcHeaderTime
serCopyHeaderTime += writeMetric.getCopyHeaderTime
serCopyBufferTime += writeMetric.getCopyBufferTime
if (measureBufferCopyTime) {
serCopyHeaderTime += writeMetric.getCopyHeaderTime
serCopyBufferTime += writeMetric.getCopyBufferTime
}
}
} else {
withResource(new NvtxRange("Serialize Row Only Batch", NvtxColor.YELLOW)) { _ =>
Expand Down
12 changes: 12 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2047,6 +2047,15 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
.booleanConf
.createWithDefault(false)

val SHUFFLE_KUDO_SERIALIZER_MEASURE_BUFFER_COPY_ENABLED =
conf("spark.rapids.shuffle.kudo.serializer.measure.buffer.copy.enabled")
.doc("Enable or disable measuring buffer copy time when using Kudo serializer for the shuffle.")
.internal()
.startupOnly()
.booleanConf
.createWithDefault(false)


// USER FACING DEBUG CONFIGS

val SHUFFLE_COMPRESSION_MAX_BATCH_MEMORY =
Expand Down Expand Up @@ -3155,6 +3164,9 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val shuffleKudoSerializerEnabled: Boolean = get(SHUFFLE_KUDO_SERIALIZER_ENABLED)

lazy val shuffleKudoMeasureBufferCopyEnabled: Boolean =
get(SHUFFLE_KUDO_SERIALIZER_MEASURE_BUFFER_COPY_ENABLED)

def isUCXShuffleManagerMode: Boolean =
RapidsShuffleManagerMode
.withName(get(SHUFFLE_MANAGER_MODE)) == RapidsShuffleManagerMode.UCX
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -172,6 +172,9 @@ abstract class GpuShuffleExchangeExecBase(
import GpuMetric._

private lazy val useKudo = RapidsConf.SHUFFLE_KUDO_SERIALIZER_ENABLED.get(child.conf)
private lazy val kudoMeasureBufferCopy = RapidsConf
.SHUFFLE_KUDO_SERIALIZER_MEASURE_BUFFER_COPY_ENABLED
.get(child.conf)

private lazy val useGPUShuffle = {
gpuOutputPartitioning match {
Expand Down Expand Up @@ -223,7 +226,7 @@ abstract class GpuShuffleExchangeExecBase(
// This value must be lazy because the child's output may not have been resolved
// yet in all cases.
private lazy val serializer: Serializer = new GpuColumnarBatchSerializer(
allMetrics, sparkTypes, useKudo)
allMetrics, sparkTypes, useKudo, kudoMeasureBufferCopy)

@transient lazy val inputBatchRDD: RDD[ColumnarBatch] = child.executeColumnar()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -114,7 +114,7 @@ class RapidsShuffleThreadedReaderSuite
val numMaps = 6
val keyValuePairsPerMap = 10
val serializer = new GpuColumnarBatchSerializer(Map.empty.withDefaultValue(NoopMetric),
Array.empty, false)
Array.empty, false, false)

// Make a mock BlockManager that will return RecordingManagedByteBuffers of data, so that we
// can ensure retain() and release() are properly called.
Expand Down

0 comments on commit b49aee9

Please sign in to comment.