1. 程式人生 > >Spark中經常使用工具類Utils的簡明介紹

Spark中經常使用工具類Utils的簡明介紹

run max news register 令行 刪除 exist bstr chan

《深入理解Spark:核心思想與源代碼分析》一書前言的內容請看鏈接《深入理解SPARK:核心思想與源代碼分析》一書正式出版上市

《深入理解Spark:核心思想與源代碼分析》一書第一章的內容請看鏈接《第1章 環境準備》

《深入理解Spark:核心思想與源代碼分析》一書第二章的內容請看鏈接《第2章 SPARK設計理念與基本架構》

《深入理解Spark:核心思想與源代碼分析》一書第三章第一部分的內容請看鏈接《深入理解Spark:核心思想與源代碼分析》——SparkContext的初始化(伯篇)》

《深入理解Spark:核心思想與源代碼分析》一書第三章第二部分的內容請看鏈接《深入理解Spark:核心思想與源代碼分析》——SparkContext的初始化(仲篇)》

《深入理解Spark:核心思想與源代碼分析》一書第三章第三部分的內容請看鏈接《深入理解Spark:核心思想與源代碼分析》——SparkContext的初始化(叔篇)》

《深入理解Spark:核心思想與源代碼分析》一書第三章第四部分的內容請看鏈接《深入理解Spark:核心思想與源代碼分析》——SparkContext的初始化(季篇)》


Utils是Spark中最經常使用的工具類之中的一個,假設不關心事實上現,也不會對理解Spark有太多影響。可是對於Scala或者Spark的剛開始學習的人來說。通過了解Utils工具類的實現,也是個不錯的入門途徑。

以下將逐個介紹Utils工具類提供的經常用法。

1.localHostName

功能描寫敘述:獲取本地機器名。


def localHostName(): String = {
    customHostname.getOrElse(localIpAddressHostname)
  }

2.getDefaultPropertiesFile

功能描寫敘述:獲取默認的Spark屬性文件。

  def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = {
    env.get("SPARK_CONF_DIR")
      .orElse(env.get("SPARK_HOME").map{ t => s"$t${File.separator}conf"})
      .map { t => new File(s"$t${File.separator}spark-defaults.conf")}
      .filter(_.isFile)
      .map(_.getAbsolutePath)
      .orNull
  }

3.loadDefaultSparkProperties

功能描寫敘述:載入指定文件裏的Spark屬性。假設沒有指定文件,則載入默認Spark屬性文件的屬性。

  def loadDefaultSparkProperties(conf:SparkConf, filePath: String = null):String = {
    val path =Option(filePath).getOrElse(getDefaultPropertiesFile())
    Option(path).foreach { confFile =>
      getPropertiesFromFile(confFile).filter{ case (k,v) =>
        k.startsWith("spark.")
      }.foreach { case (k, v) =>
       conf.setIfMissing(k, v)
       sys.props.getOrElseUpdate(k, v)
      }
    }
    path
  }

4.getCallSite

功能描寫敘述:獲取當前SparkContext的當前調用堆棧。將棧裏最靠近棧底的屬於spark或者Scala核心的類壓入callStack的棧頂。並將此類的方法存入lastSparkMethod;將棧裏最靠近棧頂的用戶類放入callStack,將此類的行號存入firstUserLine。類名存入firstUserFile。終於返回的例子類CallSite存儲了最短棧和長度默覺得20的最長棧的例子類。在JavaWordCount例子中。獲得的數據例如以下:

  • 最短棧:JavaSparkContext at JavaWordCount.java:44;
  • 最長棧:org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:61)org.apache.spark.examples.JavaWordCount.main(JavaWordCount.java:44)。
def getCallSite(skipClass: String => Boolean =coreExclusionFunction): CallSite = {
    val trace =Thread.currentThread.getStackTrace().filterNot { ste: StackTraceElement=>
      ste == null || ste.getMethodName == null || ste.getMethodName.contains("getStackTrace")
    }
    var lastSparkMethod= "<unknown>"
    var firstUserFile= "<unknown>"
    var firstUserLine= 0
    var insideSpark= true
    var callStack= new ArrayBuffer[String]() :+ "<unknown>"
 
    for (el<- trace) {
      if (insideSpark){
        if (skipClass(el.getClassName)){
          lastSparkMethod = if(el.getMethodName == "<init>") {
            el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)
          } else {
            el.getMethodName
          }
          callStack(0) = el.toString // Putlast Spark method on top of the stack trace.
        } else {
          firstUserLine = el.getLineNumber
          firstUserFile = el.getFileName
          callStack += el.toString
          insideSpark = false
        }
      } else {
        callStack += el.toString
      }
    }
    val callStackDepth= System.getProperty("spark.callstack.depth","20").toInt
    CallSite(
      shortForm = s"$lastSparkMethod at $firstUserFile:$firstUserLine",
      longForm = callStack.take(callStackDepth).mkString("\n"))
  }

5.startServiceOnPort

功能描寫敘述:Scala跟其他腳本語言一樣。函數也能夠傳遞,此方法正是通過回調startService這個函數來啟動服務,並終於返回startService返回的service地址及port。

假設啟動過程有異常。還會多次重試,直到達到maxRetries表示的最大次數。

 def startServiceOnPort[T](
      startPort:Int,
      startService:Int => (T, Int),
      conf:SparkConf,
      serviceName:String = ""): (T, Int) = {
   require(startPort == 0 || (1024 <= startPort && startPort < 65536),
      "startPort should be between 1024 and 65535(inclusive), or 0 for a random free port.")
    val serviceString= if (serviceName.isEmpty) "" elses" '$serviceName'"
    val maxRetries= portMaxRetries(conf)
    for (offset<- 0 to maxRetries){
      val tryPort= if (startPort == 0) {
        startPort
      } else {
        ((startPort+ offset - 1024)% (65536 - 1024))+ 1024
      }
      try {
        val (service,port) = startService(tryPort)
        logInfo(s"Successfullystarted service$serviceString on port $port.")
        return (service,port)
      } catch {
        case e:Exception if isBindCollision(e) =>
          if (offset>= maxRetries) {
            val exceptionMessage=
              s"${e.getMessage}:Service$serviceString failed after $maxRetries retries!"
            val exception= new BindException(exceptionMessage)
            exception.setStackTrace(e.getStackTrace)
            throw exception
          }
         logWarning(s"Service$serviceString couldnot bind on port $tryPort. " +
            s"Attempting port ${tryPort+ 1}.")
      }
    }
    throw newSparkException(s"Failed to start service$serviceString on port$startPort")
  }

6.createDirectory

功能描寫敘述:用spark+UUID的方式創建暫時文件文件夾,假設創建失敗會多次重試,最多重試10次。

  def createDirectory(root: String, namePrefix: String = "spark"): File = {
    var attempts = 0
    val maxAttempts = MAX_DIR_CREATION_ATTEMPTS
    var dir: File = null
    while (dir == null){
      attempts += 1
      if (attempts > maxAttempts) {
        throw newIOException("Failed to create a temp directory(under " + root + ") after "+
         maxAttempts + " attempts!")
      }
      try {
        dir = new File(root, "spark-"+ UUID.randomUUID.toString)
        if (dir.exists() || !dir.mkdirs()) {
          dir = null
        }
      } catch { casee: SecurityException => dir = null;}
    }
 
    dir
  }

7.getOrCreateLocalRootDirs

功能描寫敘述:依據spark.local.dir的配置,作為本地文件的根文件夾,在創建一、二級文件夾之前要確保根文件夾是存在的。然後調用createDirectory創建一級文件夾。

  private[spark] defgetOrCreateLocalRootDirs(conf: SparkConf): Array[String] = {
    if (isRunningInYarnContainer(conf)) {
     getYarnLocalDirs(conf).split(",")
    } else {
     Option(conf.getenv("SPARK_LOCAL_DIRS"))
       .getOrElse(conf.get("spark.local.dir",System.getProperty("java.io.tmpdir")))
        .split(",")
        .flatMap {root =>
          try {
            val rootDir = newFile(root)
            if (rootDir.exists || rootDir.mkdirs()) {
              val dir = createDirectory(root)
             chmod700(dir)
             Some(dir.getAbsolutePath)
            } else {
             logError(s"Failed to create dir in $root. Ignoring this directory.")
              None
            }
          } catch {
            case e: IOException =>
           logError(s"Failed to create local rootdir in $root. Ignoring this directory.")
            None
          }
        }
        .toArray
    }
  }

8.getLocalDir

功能描寫敘述:查詢Spark本地文件的一級文件夾。

  def getLocalDir(conf: SparkConf): String = {
   getOrCreateLocalRootDirs(conf)(0)
  }

9.createTempDir

功能描寫敘述:在Spark一級文件夾下創建暫時文件夾。並將文件夾註冊到shutdownDeletePaths:scala.collection.mutable.HashSet[String]中。

  def createTempDir(
      root: String= System.getProperty("java.io.tmpdir"),
      namePrefix:String = "spark"): File = {
    val dir =createDirectory(root, namePrefix)
   registerShutdownDeleteDir(dir)
    dir
  }

10.RegisterShutdownDeleteDir

功能描寫敘述:將文件夾註冊到shutdownDeletePaths:scala.collection.mutable.HashSet[String]中,以便在進程退出時刪除。

  def registerShutdownDeleteDir(file: File) {
    val absolutePath =file.getAbsolutePath()
    shutdownDeletePaths.synchronized{
      shutdownDeletePaths += absolutePath
    }
  }

11.hasRootAsShutdownDeleteDir

功能描寫敘述:推斷文件是否匹配關閉時要刪除的文件及文件夾,shutdownDeletePaths:scala.collection.mutable.HashSet[String]存儲在進程關閉時要刪除的文件及文件夾。

  def hasRootAsShutdownDeleteDir(file: File): Boolean = {
    val absolutePath= file.getAbsolutePath()
    val retval= shutdownDeletePaths.synchronized {
      shutdownDeletePaths.exists { path =>
        !absolutePath.equals(path) && absolutePath.startsWith(path)
      }
    }
    if (retval){
      logInfo("path = " + file + ", already present as root for deletion.")
    }
    retval
  }

12.deleteRecursively

功能描寫敘述:用於刪除文件或者刪除文件夾及其子文件夾、子文件,而且從shutdownDeletePaths:scala.collection.mutable.HashSet[String]中移除此文件或文件夾。

  def deleteRecursively(file: File) {
    if (file != null){
      try {
        if (file.isDirectory &&!isSymlink(file)) {
          var savedIOException:IOException = null
          for (child<- listFilesSafely(file)) {
            try {
             deleteRecursively(child)
            } catch {
              case ioe:IOException => savedIOException = ioe
            }
          }
          if (savedIOException!= null) {
            throw savedIOException
          }
          shutdownDeletePaths.synchronized {
            shutdownDeletePaths.remove(file.getAbsolutePath)
          }
        }
      } finally {
        if (!file.delete()) {
          if (file.exists()) {
            throw newIOException("Failed to delete: " +file.getAbsolutePath)
          }
        }
      }
    }
  }

13.getSparkClassLoader

功能描寫敘述:獲取載入當前class的ClassLoader。

def getSparkClassLoader = getClass.getClassLoader

14.getContextOrSparkClassLoader

功能描寫敘述:用於獲取線程上下文的ClassLoader,沒有設置時獲取載入Spark的ClassLoader。

 def getContextOrSparkClassLoader =
   Option(Thread.currentThread().getContextClassLoader).getOrElse(getSparkClassLoader)

15.newDaemonCachedThreadPool

功能描寫敘述:使用Executors.newCachedThreadPool創建的緩存線程池。

  def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
    val threadFactory =namedThreadFactory(prefix)
   Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
  }

16.doFetchFile

功能描寫敘述:使用URLConnection通過http協議下載文件。

  private defdoFetchFile(url: String, targetDir: File, filename: String, conf: SparkConf,
      securityMgr:SecurityManager, hadoopConf: Configuration) {
    val tempFile= File.createTempFile("fetchFileTemp",null, newFile(targetDir.getAbsolutePath))
    val targetFile= new File(targetDir, filename)
    val uri = new URI(url)
    val fileOverwrite= conf.getBoolean("spark.files.overwrite",defaultValue = false)
    Option(uri.getScheme).getOrElse("file")match {
      case "http"| "https" | "ftp" =>
        logInfo("Fetching " + url + " to " + tempFile)
        var uc:URLConnection = null
        if (securityMgr.isAuthenticationEnabled()) {
          logDebug("fetchFile with security enabled")
          val newuri= constructURIForAuthentication(uri,securityMgr)
          uc = newuri.toURL().openConnection()
          uc.setAllowUserInteraction(false)
        } else {
          logDebug("fetchFile not using security")
          uc = newURL(url).openConnection()
        }
        valtimeout = conf.getInt("spark.files.fetchTimeout", 60) * 1000
        uc.setConnectTimeout(timeout)
        uc.setReadTimeout(timeout)
        uc.connect()
        val in = uc.getInputStream()
       downloadFile(url, in, tempFile, targetFile,fileOverwrite)
      case "file"=>
        val sourceFile= if (uri.isAbsolute)new File(uri)else newFile(url)
       copyFile(url, sourceFile, targetFile, fileOverwrite)
      case _ =>
        val fs =getHadoopFileSystem(uri, hadoopConf)
        val in = fs.open(newPath(uri))
       downloadFile(url, in, tempFile, targetFile,fileOverwrite)
    }
  }

17.fetchFile

功能描寫敘述:假設文件在本地有緩存。則從本地獲取。否則通過HTTP遠程下載。最後對.tar、.tar.gz等格式的文件解壓縮後,調用shell命令行的chmod命令給文件添加a+x的權限。

  def fetchFile(
      url: String,
      targetDir:File,
      conf:SparkConf,
      securityMgr:SecurityManager,
      hadoopConf:Configuration,
      timestamp:Long,
      useCache:Boolean) {
    val fileName= url.split("/").last
    val targetFile= new File(targetDir, fileName)
    val fetchCacheEnabled= conf.getBoolean("spark.files.useFetchCache",defaultValue = true)
    if (useCache && fetchCacheEnabled) {
      val cachedFileName= s"${url.hashCode}${timestamp}_cache"
      val lockFileName= s"${url.hashCode}${timestamp}_lock"
      val localDir= new File(getLocalDir(conf))
      val lockFile= new File(localDir,lockFileName)
      val raf = new RandomAccessFile(lockFile,"rw")
      val lock = raf.getChannel().lock()
      val cachedFile= new File(localDir,cachedFileName)
      try {
        if (!cachedFile.exists()){
         doFetchFile(url, localDir, cachedFileName, conf, securityMgr, hadoopConf)
        }
      } finally {
        lock.release()
      }
      copyFile(
        url,
        cachedFile,
        targetFile,
       conf.getBoolean("spark.files.overwrite",false)
      )
    } else {
     doFetchFile(url, targetDir, fileName,conf, securityMgr, hadoopConf)
    }
    if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
      logInfo("Untarring " + fileName)
     Utils.execute(Seq("tar", "-xzf", fileName),targetDir)
    } else if(fileName.endsWith(".tar")){
      logInfo("Untarring " + fileName)
     Utils.execute(Seq("tar", "-xf", fileName),targetDir)
    }
    FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
  }

18.executeAndGetOutput

功能描寫敘述:運行一條command命令。而且獲取它的輸出。調用stdoutThread的join方法,讓當前線程等待stdoutThread運行完畢。

  def executeAndGetOutput(
      command:Seq[String],
      workingDir:File = new File("."),
     extraEnvironment: Map[String, String] = Map.empty): String = {
    val builder= new ProcessBuilder(command: _*)
       .directory(workingDir)
    val environment= builder.environment()
    for ((key, value) <- extraEnvironment) {
      environment.put(key,value)
    }
    val process= builder.start()
    new Thread("readstderr for " + command(0)) {
      override defrun() {
        for (line<- Source.fromInputStream(process.getErrorStream).getLines()){
          System.err.println(line)
        }
      }
    }.start()
    val output= new StringBuffer
    val stdoutThread= new Thread("readstdout for " + command(0)) {
      override defrun() {
        for (line<- Source.fromInputStream(process.getInputStream).getLines()){
          output.append(line)
        }
      }
    }
    stdoutThread.start()
    val exitCode= process.waitFor()
    stdoutThread.join()   // Wait for itto finish reading output
    if (exitCode!= 0) {
      logError(s"Process $commandexited with code $exitCode: $output")
      throw newSparkException(s"Process $command exited with code $exitCode")
    }
    output.toString
  }

19.memoryStringToMb

功能描寫敘述:將內存大小字符串轉換為以MB為單位的整型值。

  def memoryStringToMb(str: String): Int = {
    val lower =str.toLowerCase
    if (lower.endsWith("k")) {
      (lower.substring(0,lower.length-1).toLong/ 1024).toInt
    } else if(lower.endsWith("m")){
      lower.substring(0,lower.length-1).toInt
    } else if(lower.endsWith("g")){
      lower.substring(0,lower.length-1).toInt* 1024
    } else if(lower.endsWith("t")){
      lower.substring(0,lower.length-1).toInt* 1024 * 1024
    } else {// nosuffix, so it's just a number in bytes
      (lower.toLong / 1024/ 1024).toInt
    }
  }


Spark中經常使用工具類Utils的簡明介紹