Spark源码分析 – BlockManager
阅读量:5899 次

本文共 20490 字,大约阅读时间需要 68 分钟。


对于storage, 为何Spark需要storage模块?为了cache RDD

Spark的特点就是可以将RDD cache在memory或disk中,RDD是由partitions组成的,对应于block

首先每个节点都有一个BlockManager, 其中有一个是Driver(master), 其余的都是slave

master负责track所有的slave BlockManager的BlockManagerInfo, 而BlockManagerInfo中又track了该BlockManager管理的所有的block的BlockStatus
当slave上的block有任何变化的时候, 需要发送updateBlockInfo事件来更新master上block信息
典型的中心化设计, master和slave之间的通信通过actor来进行, 当然对于block数据的传输, 由于数据量比较大, 所以使用connectionManager(NIO或Netty)
所以自然需要BlockManagerMasterActor和BlockManagerSlaveActor, 参考

其中还有个BlockManagerMaster,负责wrap BlockManagerMasterActor, 比较confusing的是每个节点都会创建这个BlockManagerMaster, 只是在slave中不会真正创建BlockManagerMasterActor, 而是Ref, 不好的设计

而且由于BlockManager被master和slave公用, 所以提供了两者大部分接口, 而对于master部分都是直接wrap BlockManagerMaster, 而对于slave中的数据读写等就直接实现了, 设计不统一

总之, storage这个模块, 设计比较随意, 不是很合理, 也体现在一些细小的命名上, 给分析和理解带来了一些困难.


在SparkEnv的初始化中, 创建BlockManagerMaster和blockManager

val blockManagerMaster = new BlockManagerMaster(registerOrLookup(      "BlockManagerMaster",      new BlockManagerMasterActor(isLocal)))    val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)
// 创建actor和actor ref // 对于BlockManagerMaster, 在master上创建BlockManagerMasterActor, 而在slave上创建BlockManagerMasterActor的ref
def registerOrLookup(name: String, newActor: => Actor): ActorRef = {      if (isDriver) {        logInfo("Registering " + name)        actorSystem.actorOf(Props(newActor), name = name)      } else {        val driverHost: String = System.getProperty("spark.driver.host", "localhost")        val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt        Utils.checkHost(driverHost, "Expected hostname")        val url = "akka://spark@%s:%s/user/%s".format(driverHost, driverPort, name)        logInfo("Connecting to " + name + ": " + url)        actorSystem.actorFor(url)      }    }


1 BlockManagerId

BlockManagerId作为BlockManager唯一标识, 所以希望一个BlockManager只创建一个BlockManagerId 对象

在Scala里面实现Singleton比较晦涩, 这里是个典型的例子
将所有的构造函数设为private, 然后利用伴生对象的来创建对象实例

/** * This class represent an unique identifier for a BlockManager. * The first 2 constructors of this class is made private to ensure that * BlockManagerId objects can be created only using the apply method in * the companion object. This allows de-duplication of ID objects. * Also, constructor parameters are private to ensure that parameters cannot * be modified from outside this class. */private[spark] class BlockManagerId private (    private var executorId_ : String,    private var host_ : String,    private var port_ : Int,    private var nettyPort_ : Int  ) extends Externalizable {  private def this() = this(null, null, 0, 0)  // For deserialization only}
private[spark] object BlockManagerId {  /**   * Returns a [[org.apache.spark.storage.BlockManagerId]] for the given configuraiton.   *   * @param execId ID of the executor.   * @param host Host name of the block manager.   * @param port Port of the block manager.   * @param nettyPort Optional port for the Netty-based shuffle sender.   * @return A new [[org.apache.spark.storage.BlockManagerId]].   */  def apply(execId: String, host: String, port: Int, nettyPort: Int) =    getCachedBlockManagerId(new BlockManagerId(execId, host, port, nettyPort))  def apply(in: ObjectInput) = {    val obj = new BlockManagerId()    obj.readExternal(in)    getCachedBlockManagerId(obj)  }  val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]()  def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {    blockManagerIdCache.putIfAbsent(id, id)    blockManagerIdCache.get(id)  }}


2 BlockManager

BlockManager是被master和slave公用的, 但对于master的逻辑都已经wrap在BlockManagerMaster中了

所以这里主要分析一些slave相关的接口逻辑, reportBlockStatus, get, put
其中put, get使用到memoryStore和diskStore, 参考

private[spark] class BlockManager(    executorId: String,    actorSystem: ActorSystem,    val master: BlockManagerMaster,    val defaultSerializer: Serializer,    maxMemory: Long)  extends Logging {  private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {} // BlockInfo的定义, 详细见下  val shuffleBlockManager = new ShuffleBlockManager(this)  private val blockInfo = new TimeStampedHashMap[String, BlockInfo] // 记录manage的所有block的BlockInfo [blockid,blockinfo]  private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)  private[storage] val diskStore: DiskStore = new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
val blockManagerId = BlockManagerId(executorId, connectionManager.id.host, connectionManager.id.port, nettyPort) // BlockManagerId
val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),   // 创建slaveActor, 貌似每个BlockManager都会创建slaveActor    name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
/**   * Initialize the BlockManager. Register to the BlockManagerMaster, and start the   * BlockManagerWorker actor.   */  private def initialize() {    master.registerBlockManager(blockManagerId, maxMemory, slaveActor) // 向master注册BlockManager, 如果本身就是driver, 啥都不做    BlockManagerWorker.startBlockManagerWorker(this) // 创建BlockManagerWorker用于和remote传输block,block比较大所以无法用akka    if (!BlockManager.getDisableHeartBeatsForTesting) {      heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) { // 设定scheduler定期发送hb        heartBeat()      }    }  }

2.1 BlockInfo

BlockInfo关键是对block做了访问互斥, 访问block前需要, 先waitForReady

所以每个block, 都需要生成一个BlockInfo来经行互斥管理
BlockManagerMasterActor中updateBlockInfo事件更新的不是这个BlockInfo, 而是BlockManagerInfo.BlockStatus, 不太合理!

private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {    @volatile var pending: Boolean = true    @volatile var size: Long = -1L    @volatile var initThread: Thread = null    @volatile var failed = false    setInitThread()    private def setInitThread() {      // Set current thread as init thread - waitForReady will not block this thread      // (in case there is non trivial initialization which ends up calling waitForReady as part of      // initialization itself)      this.initThread = Thread.currentThread()    }    /**     * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing).     * Return true if the block is available, false otherwise.     */    def waitForReady(): Boolean = {      if (initThread != Thread.currentThread() && pending) {        synchronized {          while (pending) this.wait()        }      }      !failed    }    /** Mark this BlockInfo as ready (i.e. block is finished writing) */    def markReady(sizeInBytes: Long) {      assert (pending)      size = sizeInBytes      initThread = null      failed = false      initThread = null      pending = false      synchronized {        this.notifyAll()      }    }    /** Mark this BlockInfo as ready but failed */    def markFailure() {      assert (pending)      size = 0      initThread = null      failed = true      initThread = null      pending = false      synchronized {        this.notifyAll()      }    }  }

2.2 reportBlockStatus

/**   * Tell the master about the current storage status of a block. This will send a block update   * message reflecting the current status, *not* the desired storage level in its block info.   * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.   *   * droppedMemorySize exists to account for when block is dropped from memory to disk (so it is still valid).   * This ensures that update in master will compensate for the increase in memory on slave.   */  def reportBlockStatus(blockId: String, info: BlockInfo, droppedMemorySize: Long = 0L) {    val needReregister = !tryToReportBlockStatus(blockId, info, droppedMemorySize) // 如果返回false, 说明你发的blockid在master没有, 需要重新注册    if (needReregister) {      logInfo("Got told to reregister updating block " + blockId)      // Reregistering will report our new block for free.      asyncReregister()    }    logDebug("Told master about block " + blockId)  }  /**   * Actually send a UpdateBlockInfo message. Returns the mater's response,   * which will be true if the block was successfully recorded and false if   * the slave needs to re-register.   */  private def tryToReportBlockStatus(blockId: String, info: BlockInfo, droppedMemorySize: Long = 0L): Boolean = {    val (curLevel, inMemSize, onDiskSize, tellMaster) = info.synchronized {      info.level match {        case null =>          (StorageLevel.NONE, 0L, 0L, false)        case level =>          val inMem = level.useMemory && memoryStore.contains(blockId)          val onDisk = level.useDisk && diskStore.contains(blockId)          val storageLevel = StorageLevel(onDisk, inMem, level.deserialized, level.replication)          val memSize = if (inMem) memoryStore.getSize(blockId) else droppedMemorySize          val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L          (storageLevel, memSize, diskSize, info.tellMaster)      }    }    if (tellMaster) {  // 把当前block的情况, disk和memory的使用情况报告给master      master.updateBlockInfo(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)    } else {      true    }  }

2.3 Get

/**   * Get block from local block manager, 在本地读取block   */  def getLocal(blockId: String): Option[Iterator[Any]] = {    val info = blockInfo.get(blockId).orNull    if (info != null) {      info.synchronized { // 对block的互斥访问        // In the another thread is writing the block, wait for it to become ready.        if (!info.waitForReady()) { // 等待block ready, block只能被线性的写入          // If we get here, the block write failed.          logWarning("Block " + blockId + " was marked as failure.")          return None        }        val level = info.level        // Look for the block in memory        if (level.useMemory) { // 如果storage level是用到memory的, 就先在memoryStore中试图取这个block            memoryStore.getValues(blockId) match {             case Some(iterator) =>              return Some(iterator) // 直接返回iterator            case None =>              logDebug("Block " + blockId + " not found in memory")          }        }        //前面在memory中没有找到, 所以继续在disk里面找
//Look for block on disk, potentially loading it back into memory if required        if (level.useDisk) {          if (level.useMemory && level.deserialized) { // MEMORY_AND_DISK, 没有序列化, 部分数据在disk            diskStore.getValues(blockId) match {              case Some(iterator) =>  // 从disk中取出这个block, 并重新放到memory中                // Put the block back in memory before returning it                // TODO: Consider creating a putValues that also takes in a iterator ?                val elements = new ArrayBuffer[Any]                elements ++= iterator                memoryStore.putValues(blockId, elements, level, true).data match {                  case Left(iterator2) => // 期望从putValues中得到存入block的iterator                    return Some(iterator2)                  case _ =>                    throw new Exception("Memory store did not return back an iterator")                }              case None =>                throw new Exception("Block " + blockId + " not found on disk, though it should be")            }          } else if (level.useMemory && !level.deserialized) { // MEMORY_AND_DISK_SER, 序列化            // Read it as a byte buffer into memory first, then return it            diskStore.getBytes(blockId) match { // 由于读取的是序列化数据, 使用getBytes              case Some(bytes) =>                // Put a copy of the block back in memory before returning it. Note that we can't                // put the ByteBuffer returned by the disk store as that's a memory-mapped file.                // The use of rewind assumes this.                assert (0 == bytes.position())                val copyForMemory = ByteBuffer.allocate(bytes.limit)                copyForMemory.put(bytes)                memoryStore.putBytes(blockId, copyForMemory, level) // 在memoryStore中缓存的仍然是序列化数据                bytes.rewind() // 反序列化需要重新读数据, 所以rewind                return Some(dataDeserialize(blockId, bytes)) // 但返回的需要反序列化后的数据              case None =>                throw new Exception("Block " + blockId + " not found on disk, though it should be")            }          } else { // DISK_ONLY, 没啥说的, 直接取disk读            diskStore.getValues(blockId) match {              case Some(iterator) =>                return Some(iterator)              case None =>                throw new Exception("Block " + blockId + " not found on disk, though it should be")            }          }        }      }    } else {      logDebug("Block " + blockId + " not registered locally")    }    return None  }
/**   * Get block from the local block manager as serialized bytes.   */  def getLocalBytes(blockId: String): Option[ByteBuffer] = {  //逻辑更简单......}
/**   * Get block from remote block managers.   */  def getRemote(blockId: String): Option[Iterator[Any]] = {    // Get locations of block    val locations = master.getLocations(blockId)    // Get block from remote locations    for (loc <- locations) {      val data = BlockManagerWorker.syncGetBlock( //使用BlockManagerWorker从remote获取block          GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))      if (data != null) {        return Some(dataDeserialize(blockId, data))      }      logDebug("The value of block " + blockId + " is null")    }    logDebug("Block " + blockId + " not found")    return None  }

2.3 Put

/**   * Put a new block of values to the block manager. Returns its (estimated) size in bytes.   */  def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel,    tellMaster: Boolean = true) : Long = {    // Remember the block's storage level so that we can correctly drop it to disk if it needs    // to be dropped right after it got put into memory. Note, however, that other threads will    // not be able to get() this block until we call markReady on its BlockInfo.    val myInfo = {       val tinfo = new BlockInfo(level, tellMaster) // 创建新的BlockInfo      // Do atomically !      val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) // check blockid的blockinfo是否已经存在      if (oldBlockOpt.isDefined) { // 如果存在就需要互斥        if (oldBlockOpt.get.waitForReady()) {          logWarning("Block " + blockId + " already exists on this machine; not re-adding it")          return oldBlockOpt.get.size        }        // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ?        oldBlockOpt.get      } else {        tinfo      }    }    // If we need to replicate the data, we'll want access to the values, but because our    // put will read the whole iterator, there will be no values left. For the case where    // the put serializes data, we'll remember the bytes, above; but for the case where it    // doesn't, such as deserialized storage, let's rely on the put returning an Iterator.    var valuesAfterPut: Iterator[Any] = null    // Ditto for the bytes after the put    var bytesAfterPut: ByteBuffer = null    // Size of the block in bytes (to return to caller)    var size = 0L    myInfo.synchronized { // 加锁, 开始真正的put      var marked = false      try {        if (level.useMemory) { // 如果可以用memory, 优先放memory里面          // Save it just to memory first, even if it also has useDisk set to true; we will later          // drop it to disk if the memory store can't hold it.          val res = memoryStore.putValues(blockId, values, level, true)          size = res.size          res.data match {            case Right(newBytes) => bytesAfterPut = newBytes            case Left(newIterator) => valuesAfterPut = newIterator          }        } else { // 否则存到disk上          // Save directly to disk.          // Don't get back the bytes unless we replicate them.          val askForBytes = level.replication > 1          val res = diskStore.putValues(blockId, values, level, askForBytes)          size = res.size          res.data match {            case Right(newBytes) => bytesAfterPut = newBytes            case _ =>          }        }        // Now that the block is in either the memory or disk store, let other threads read it,        // and tell the master about it.        marked = true  // 释放blockinfo上的互斥条件, 让其他线程可以访问改block        myInfo.markReady(size)        if (tellMaster) {          reportBlockStatus(blockId, myInfo) // 通知master, block状态变化        }      } finally {        // If we failed at putting the block to memory/disk, notify other possible readers        // that it has failed, and then remove it from the block info map.        if (! marked) { // 如果put失败, 需要做些clear工作          // Note that the remove must happen before markFailure otherwise another thread          // could've inserted a new BlockInfo before we remove it.          blockInfo.remove(blockId)          myInfo.markFailure()          logWarning("Putting block " + blockId + " failed")        }      }    }    // Replicate block if required    if (level.replication > 1) {      val remoteStartTime = System.currentTimeMillis      // Serialize the block if not already done      if (bytesAfterPut == null) {        if (valuesAfterPut == null) {          throw new SparkException(            "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")        }        bytesAfterPut = dataSerialize(blockId, valuesAfterPut)      }      replicate(blockId, bytesAfterPut, level) // 做replicate      logDebug("Put block " + blockId + " remotely took " + Utils.getUsedTimeMs(remoteStartTime))    }    BlockManager.dispose(bytesAfterPut)    return size  }
/**   * Put a new block of serialized bytes to the block manager.   */  def putBytes(    blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) {    //逻辑比较简单......  }


/**   * Replicate block to another node.   */  var cachedPeers: Seq[BlockManagerId] = null  private def replicate(blockId: String, data: ByteBuffer, level: StorageLevel) {    val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)    if (cachedPeers == null) {      cachedPeers = master.getPeers(blockManagerId, level.replication - 1) //找到可用于replica的peers    }    for (peer: BlockManagerId <- cachedPeers) {  //把需要replica的block放到这些peer上去      val start = System.nanoTime      data.rewind()      if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data, tLevel), //通过BlockManagerWorker传输block数据        new ConnectionManagerId(peer.host, peer.port))) {        logError("Failed to call syncPutBlock to " + peer)      }      logDebug("Replicated BlockId " + blockId + " once used " +        (System.nanoTime - start) / 1e6 + " s; The size of the data is " +        data.limit() + " bytes.")    }  }

2.3 dropFromMemory

/**   * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory   * store reaches its limit and needs to free up space.   */  def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {    logInfo("Dropping block " + blockId + " from memory")    val info = blockInfo.get(blockId).orNull    if (info != null)  {      info.synchronized {  //获取blockInfo的互斥        // required ? As of now, this will be invoked only for blocks which are ready        // But in case this changes in future, adding for consistency sake.        if (! info.waitForReady() ) {          // If we get here, the block write failed.          logWarning("Block " + blockId + " was marked as failure. Nothing to drop")          return        }        val level = info.level        if (level.useDisk && !diskStore.contains(blockId)) { // 如果使用disk, 就把memory中要删除的写入disk          logInfo("Writing block " + blockId + " to disk")          data match {            case Left(elements) =>              diskStore.putValues(blockId, elements, level, false)            case Right(bytes) =>              diskStore.putBytes(blockId, bytes, level)          }        }        val droppedMemorySize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L // 计算出从memory中drop掉的size        val blockWasRemoved = memoryStore.remove(blockId)  // 从memoryStore drop掉block        if (info.tellMaster) {          reportBlockStatus(blockId, info, droppedMemorySize) // 通知master, block信息变化        }        if (!level.useDisk) {          // The block is completely gone from this node; forget it so we can put() it again later.          blockInfo.remove(blockId) // 如果没有使用disk, 那么从memory中删除, 意味着完全删除这个block        }      }    } else {      // The block has already been dropped    }  }


