@@ -43,8 +43,9 @@ import org.apache.spark.internal.config.Network
43
43
import org .apache .spark .memory .{MemoryManager , MemoryMode }
44
44
import org .apache .spark .metrics .source .Source
45
45
import org .apache .spark .network ._
46
- import org .apache .spark .network .buffer .ManagedBuffer
46
+ import org .apache .spark .network .buffer .{ FileSegmentManagedBuffer , ManagedBuffer }
47
47
import org .apache .spark .network .client .StreamCallbackWithID
48
+ import org .apache .spark .network .netty .SparkTransportConf
48
49
import org .apache .spark .network .shuffle ._
49
50
import org .apache .spark .network .shuffle .protocol .ExecutorShuffleInfo
50
51
import org .apache .spark .network .util .TransportConf
@@ -138,6 +139,8 @@ private[spark] class BlockManager(
138
139
private val remoteReadNioBufferConversion =
139
140
conf.get(Network .NETWORK_REMOTE_READ_NIO_BUFFER_CONVERSION )
140
141
142
+ private [spark] val subDirsPerLocalDir = conf.get(config.DISKSTORE_SUB_DIRECTORIES )
143
+
141
144
val diskBlockManager = {
142
145
// Only perform cleanup if an external service is not serving our shuffle files.
143
146
val deleteFilesOnStop =
@@ -411,6 +414,7 @@ private[spark] class BlockManager(
411
414
412
415
val idFromMaster = master.registerBlockManager(
413
416
id,
417
+ diskBlockManager.localDirsString,
414
418
maxOnHeapMemory,
415
419
maxOffHeapMemory,
416
420
slaveEndpoint)
@@ -445,7 +449,7 @@ private[spark] class BlockManager(
445
449
private def registerWithExternalShuffleServer () {
446
450
logInfo(" Registering executor with local external shuffle service." )
447
451
val shuffleConfig = new ExecutorShuffleInfo (
448
- diskBlockManager.localDirs.map(_.toString) ,
452
+ diskBlockManager.localDirsString ,
449
453
diskBlockManager.subDirsPerLocalDir,
450
454
shuffleManager.getClass.getName)
451
455
@@ -500,7 +504,8 @@ private[spark] class BlockManager(
500
504
def reregister (): Unit = {
501
505
// TODO: We might need to rate limit re-registering.
502
506
logInfo(s " BlockManager $blockManagerId re-registering with master " )
503
- master.registerBlockManager(blockManagerId, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint)
507
+ master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory,
508
+ maxOffHeapMemory, slaveEndpoint)
504
509
reportAllBlocks()
505
510
}
506
511
@@ -827,10 +832,63 @@ private[spark] class BlockManager(
827
832
*/
828
833
private [spark] def getRemoteValues [T : ClassTag ](blockId : BlockId ): Option [BlockResult ] = {
829
834
val ct = implicitly[ClassTag [T ]]
830
- getRemoteManagedBuffer (blockId).map { data =>
835
+ getRemoteBlock (blockId, ( data : ManagedBuffer ) => {
831
836
val values =
832
837
serializerManager.dataDeserializeStream(blockId, data.createInputStream())(ct)
833
838
new BlockResult (values, DataReadMethod .Network , data.size)
839
+ })
840
+ }
841
+
842
+ /**
843
+ * Get the remote block and transform it to the provided data type.
844
+ *
845
+ * If the block is persisted to the disk and stored at an executor running on the same host then
846
+ * first it is tried to be accessed using the local directories of the other executor directly.
847
+ * If the file is successfully identified then tried to be transformed by the provided
848
+ * transformation function which expected to open the file. If there is any exception during this
849
+ * transformation then block access falls back to fetching it from the remote executor via the
850
+ * network.
851
+ *
852
+ * @param blockId identifies the block to get
853
+ * @param bufferTransformer this transformer expected to open the file if the block is backed by a
854
+ * file by this it is guaranteed the whole content can be loaded
855
+ * @tparam T result type
856
+ * @return
857
+ */
858
+ private [spark] def getRemoteBlock [T ](
859
+ blockId : BlockId ,
860
+ bufferTransformer : ManagedBuffer => T ): Option [T ] = {
861
+ logDebug(s " Getting remote block $blockId" )
862
+ require(blockId != null , " BlockId is null" )
863
+
864
+ // Because all the remote blocks are registered in driver, it is not necessary to ask
865
+ // all the slave executors to get block status.
866
+ val locationsAndStatusOption = master.getLocationsAndStatus(blockId, blockManagerId.host)
867
+ if (locationsAndStatusOption.isEmpty) {
868
+ logDebug(s " Block $blockId is unknown by block manager master " )
869
+ None
870
+ } else {
871
+ val locationsAndStatus = locationsAndStatusOption.get
872
+ val blockSize = locationsAndStatus.status.diskSize.max(locationsAndStatus.status.memSize)
873
+
874
+ locationsAndStatus.localDirs.flatMap { localDirs =>
875
+ val blockDataOption =
876
+ readDiskBlockFromSameHostExecutor(blockId, localDirs, locationsAndStatus.status.diskSize)
877
+ val res = blockDataOption.flatMap { blockData =>
878
+ try {
879
+ Some (bufferTransformer(blockData))
880
+ } catch {
881
+ case NonFatal (e) =>
882
+ logDebug(" Block from the same host executor cannot be opened: " , e)
883
+ None
884
+ }
885
+ }
886
+ logInfo(s " Read $blockId from the disk of a same host executor is " +
887
+ (if (res.isDefined) " successful." else " failed." ))
888
+ res
889
+ }.orElse {
890
+ fetchRemoteManagedBuffer(blockId, blockSize, locationsAndStatus).map(bufferTransformer)
891
+ }
834
892
}
835
893
}
836
894
@@ -861,22 +919,12 @@ private[spark] class BlockManager(
861
919
}
862
920
863
921
/**
864
- * Get block from remote block managers as a ManagedBuffer.
922
+ * Fetch the block from remote block managers as a ManagedBuffer.
865
923
*/
866
- private def getRemoteManagedBuffer (blockId : BlockId ): Option [ManagedBuffer ] = {
867
- logDebug(s " Getting remote block $blockId" )
868
- require(blockId != null , " BlockId is null" )
869
- var runningFailureCount = 0
870
- var totalFailureCount = 0
871
-
872
- // Because all the remote blocks are registered in driver, it is not necessary to ask
873
- // all the slave executors to get block status.
874
- val locationsAndStatus = master.getLocationsAndStatus(blockId)
875
- val blockSize = locationsAndStatus.map { b =>
876
- b.status.diskSize.max(b.status.memSize)
877
- }.getOrElse(0L )
878
- val blockLocations = locationsAndStatus.map(_.locations).getOrElse(Seq .empty)
879
-
924
+ private def fetchRemoteManagedBuffer (
925
+ blockId : BlockId ,
926
+ blockSize : Long ,
927
+ locationsAndStatus : BlockManagerMessages .BlockLocationsAndStatus ): Option [ManagedBuffer ] = {
880
928
// If the block size is above the threshold, we should pass our FileManger to
881
929
// BlockTransferService, which will leverage it to spill the block; if not, then passed-in
882
930
// null value means the block will be persisted in memory.
@@ -885,8 +933,9 @@ private[spark] class BlockManager(
885
933
} else {
886
934
null
887
935
}
888
-
889
- val locations = sortLocations(blockLocations)
936
+ var runningFailureCount = 0
937
+ var totalFailureCount = 0
938
+ val locations = sortLocations(locationsAndStatus.locations)
890
939
val maxFetchFailures = locations.size
891
940
var locationIterator = locations.iterator
892
941
while (locationIterator.hasNext) {
@@ -946,11 +995,37 @@ private[spark] class BlockManager(
946
995
None
947
996
}
948
997
998
+ /**
999
+ * Reads the block from the local directories of another executor which runs on the same host.
1000
+ */
1001
+ private [spark] def readDiskBlockFromSameHostExecutor (
1002
+ blockId : BlockId ,
1003
+ localDirs : Array [String ],
1004
+ blockSize : Long ): Option [ManagedBuffer ] = {
1005
+ val file = ExecutorDiskUtils .getFile(localDirs, subDirsPerLocalDir, blockId.name)
1006
+ if (file.exists()) {
1007
+ val mangedBuffer = securityManager.getIOEncryptionKey() match {
1008
+ case Some (key) =>
1009
+ // Encrypted blocks cannot be memory mapped; return a special object that does decryption
1010
+ // and provides InputStream / FileRegion implementations for reading the data.
1011
+ new EncryptedManagedBuffer (
1012
+ new EncryptedBlockData (file, blockSize, conf, key))
1013
+
1014
+ case _ =>
1015
+ val transportConf = SparkTransportConf .fromSparkConf(conf, " shuffle" )
1016
+ new FileSegmentManagedBuffer (transportConf, file, 0 , file.length)
1017
+ }
1018
+ Some (mangedBuffer)
1019
+ } else {
1020
+ None
1021
+ }
1022
+ }
1023
+
949
1024
/**
950
1025
* Get block from remote block managers as serialized bytes.
951
1026
*/
952
1027
def getRemoteBytes (blockId : BlockId ): Option [ChunkedByteBuffer ] = {
953
- getRemoteManagedBuffer (blockId).map { data =>
1028
+ getRemoteBlock (blockId, ( data : ManagedBuffer ) => {
954
1029
// SPARK-24307 undocumented "escape-hatch" in case there are any issues in converting to
955
1030
// ChunkedByteBuffer, to go back to old code-path. Can be removed post Spark 2.4 if
956
1031
// new path is stable.
@@ -959,7 +1034,7 @@ private[spark] class BlockManager(
959
1034
} else {
960
1035
ChunkedByteBuffer .fromManagedBuffer(data)
961
1036
}
962
- }
1037
+ })
963
1038
}
964
1039
965
1040
/**
0 commit comments