Skip to content

Commit 3699763

Browse files
squitoMarcelo Vanzin
authored and
Marcelo Vanzin
committed
[SPARK-26697][CORE] Log local & remote block sizes.
## What changes were proposed in this pull request? To help debugging failed or slow tasks, its really useful to know the size of the blocks getting fetched. Though that is available at the debug level, debug logs aren't on in general -- but there is already an info level log line that this augments a little. ## How was this patch tested? Ran very basic local-cluster mode app, looked at logs. Example line: ``` INFO ShuffleBlockFetcherIterator: Getting 2 (194.0 B) non-empty blocks including 1 (97.0 B) local blocks and 1 (97.0 B) remote blocks ``` Full suite via jenkins. Closes apache#23621 from squito/SPARK-26697. Authored-by: Imran Rashid <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent 8d667c5 commit 3699763

File tree

1 file changed

+8
-2
lines changed

1 file changed

+8
-2
lines changed

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

+8-2
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,8 @@ final class ShuffleBlockFetcherIterator(
273273
// Split local and remote blocks. Remote blocks are further split into FetchRequests of size
274274
// at most maxBytesInFlight in order to limit the amount of data in flight.
275275
val remoteRequests = new ArrayBuffer[FetchRequest]
276+
var localBlockBytes = 0L
277+
var remoteBlockBytes = 0L
276278

277279
for ((address, blockInfos) <- blocksByAddress) {
278280
if (address.executorId == blockManager.blockManagerId.executorId) {
@@ -284,13 +286,15 @@ final class ShuffleBlockFetcherIterator(
284286
case None => // do nothing.
285287
}
286288
localBlocks ++= blockInfos.map(_._1)
289+
localBlockBytes += blockInfos.map(_._2).sum
287290
numBlocksToFetch += localBlocks.size
288291
} else {
289292
val iterator = blockInfos.iterator
290293
var curRequestSize = 0L
291294
var curBlocks = new ArrayBuffer[(BlockId, Long)]
292295
while (iterator.hasNext) {
293296
val (blockId, size) = iterator.next()
297+
remoteBlockBytes += size
294298
if (size < 0) {
295299
throw new BlockException(blockId, "Negative block size " + size)
296300
} else if (size == 0) {
@@ -317,8 +321,10 @@ final class ShuffleBlockFetcherIterator(
317321
}
318322
}
319323
}
320-
logInfo(s"Getting $numBlocksToFetch non-empty blocks including ${localBlocks.size}" +
321-
s" local blocks and ${remoteBlocks.size} remote blocks")
324+
val totalBytes = localBlockBytes + remoteBlockBytes
325+
logInfo(s"Getting $numBlocksToFetch (${Utils.bytesToString(totalBytes)}) non-empty blocks " +
326+
s"including ${localBlocks.size} (${Utils.bytesToString(localBlockBytes)}) local blocks and " +
327+
s"${remoteBlocks.size} (${Utils.bytesToString(remoteBlockBytes)}) remote blocks")
322328
remoteRequests
323329
}
324330

0 commit comments

Comments
 (0)