1. 程式人生 > >Crail-spark-io原始碼閱讀

Crail-spark-io原始碼閱讀

crail-spark-io原始碼閱讀

storage

CrailDispatcher.scala

registerShuffle方法

  /* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */
  def registerShuffle(shuffleId: Int,
numMaps: Int, partitions: Int) : Unit = { //logInfo("registering shuffle " + shuffleId + ", time " + ", cacheSize " + fs.getCacheSize) val shuffleStore = new CrailShuffleStore val oldStore = shuffleCache.putIfAbsent(shuffleId, shuffleStore) val futureQueue = new LinkedBlockingQueue[Future[
CrailNode]]() val start = System.currentTimeMillis() val shuffleIdDir = shuffleDir + "/shuffle_" + shuffleId var future : Future[CrailNode] = fs.create(shuffleIdDir, CrailNodeType.DIRECTORY, CrailStorageClass.PARENT, CrailLocationClass.DEFAULT, true) futureQueue.add(future) var i =
0 while (i < partitions) { val subDir = shuffleIdDir + "/" + "part_" + i.toString future = fs.create(subDir, CrailNodeType.MULTIFILE, CrailStorageClass.PARENT, CrailLocationClass.DEFAULT, true) futureQueue.add(future) i+=1 } val fileQueue = new LinkedBlockingQueue[CrailNode]() while(!futureQueue.isEmpty){ val file = futureQueue.poll().get() fileQueue.add(file) } while(!fileQueue.isEmpty){ fileQueue.poll().syncDir() } val end = System.currentTimeMillis() val executionTime = (end - start) / 1000.0 }