1. 程式人生 > >Spark 1.5.2 on yarn升級問題總結

Spark 1.5.2 on yarn升級問題總結

1    升級背景

standlone 生產叢集運行了半年,出現資源瓶頸;另外多使用者資源管理問題也凸顯,將spark 遷移到 yarn 上面是目前比較理想的方案。

spark on yarn 有如下兩個優點:

  • 充分使用叢集資源,方便多使用者資源管理;
  • 擴容更為方便;

2    遇到問題

1)  程式碼使用system.exit(-1)結果卻顯示正常

測試程式碼:

def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("HiveTest")
    val initConf = SparkConstant.initConf(sparkConf)
    val sc = new SparkContext(initConf)
    sc.parallelize(1 to 2000000,4).map(x=>x%0.24).map{ x=> x+0.1
    System.exit(-1)
   sc.stop()  
}

任務退出後applicationmaster卻顯示任務成功:


某個別業務當程式遇到的異常的時候,直接使用System.exit(-1)退出程式,出現了上面的情況。

appmaster日誌分析:

16/04/08 11:07:29 INFO storage.MemoryStore: MemoryStore cleared
16/04/08 11:07:29 INFO storage.BlockManager: BlockManager stopped
16/04/08 11:07:29 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
16/04/08 11:07:29 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/04/08 11:07:29 INFO spark.SparkContext: Successfully stopped SparkContext
16/04/08 11:07:29 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0, (reason: Shutdown hook called before final status was reported.)
16/04/08 11:07:29 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/04/08 11:07:29 INFO yarn.ApplicationMaster: <span style="color:#ff6666;">Unregistering ApplicationMaster with SUCCEEDED (diag message: Shutdown hook called before final status was reported.)</span>
16/04/08 11:07:29 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/04/08 11:07:29 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
16/04/08 11:07:29 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
16/04/08 11:07:29 INFO yarn.ApplicationMaster: Deleting staging directory .sparkStaging/application_1460021101912_10211

如日誌顯示:ApplicationMaster提示任務被提前stop,但是為什麼顯示退出success了?下面我們分析相關原始碼:

if (!finished) {
  // This happens when the user application calls System.exit(). We have the choice
  // of either failing or succeeding at this point. We report success to avoid
  // retrying applications that have succeeded (System.exit(0)), which means that
  // applications that explicitly exit with a non-zero status will also show up as
  // succeeded in the RM UI.
  finish(finalStatus,
    ApplicationMaster.EXIT_SUCCESS,
    "Shutdown hook called before final status was reported.")
}
final def finish(status: FinalApplicationStatus, code: Int, msg: String = null): Unit = {
  synchronized {
    if (!finished) {
      val inShutdown = ShutdownHookManager.inShutdown()
      logInfo(s"Final app status: $status, exitCode: $code" +
        Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
      exitCode = code
      finalStatus = status
      finalMsg = msg
      finished = true
      if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) {
        logDebug("shutting down reporter thread")
        reporterThread.interrupt()
      }
      if (!inShutdown && Thread.currentThread() != userClassThread && userClassThread != null) {
        logDebug("shutting down user thread")
        userClassThread.interrupt()
      }
      if (!inShutdown) delegationTokenRenewerOption.foreach(_.stop())
    }
  }
}

   如果異常退出,將ApplicationMaster的exist code設定為0,也就是正常退出。我們看看這樣做的原因,如上面說明解釋,始終顯示success的原因是防止applicationmaster被重試,導致任務失敗會再次提交。

解決辦法:

  •  直接選擇丟擲異常;如果讀者選擇丟擲異常的話,applicationmaster會選擇下面程式碼:
    
case e: Throwable => {
  failureCount += 1
  if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
    finish(FinalApplicationStatus.FAILED,
      ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
        s"$failureCount time(s) from Reporter thread.")
直接將該任務置為錯誤狀態,但是會導致任務重試。

  • 判斷任務成功的標誌應該是exitcode為0 並且Diagnostics不顯示Shutdownhook called before final status was reported;
     
 def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("HiveTest")
    val initConf = SparkConstant.initConf(sparkConf)
    val sc = new SparkContext(initConf)
    sc.parallelize(1 to 2000000,4).map(x=>x%0.24).map{ x=> x+0.1
    }.reduce(_+_)
   
      val e = new Exception("this is my exception")
      throw e
  }
}

2)  driver、executor PermGen Space oom

  在計算過程中,特別是載入hive或者HBase第三方packages的情況下,出現driver、executor大量的PermGenSpace oom。spark on yarn和standlone一樣,需要配置driver、executor的jvm相關引數。目前我們的配置是:
spark.driver.extraJavaOptions           -XX:MaxPermSize=512m  -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=80  -XX:GCTimeLimit=5 -XX:GCHeapFreeLimit=95
spark.executor.extraJavaOptions         -XX:MaxPermSize=512m   -XX:+PrintGCDetails -XX:+PrintGCTimeStamps  -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=80 -XX:GCTimeLimit=5 -XX:GCHeapFreeLimit=95

  •  -XX:MaxPermSize=512m:增加PermGen Space大小,預設是128M;會發生PermGenSpace oom;
  •   -XX:+PrintGCDetails-XX:+PrintGCTimeStamps:列印GC日誌stdout日誌,方便觀察計算過程中的GC情況和記憶體使用情況;
  •   -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=80  -XX:GCTimeLimit=5 -XX:GCHeapFreeLimit=95:修改GC策略,目前修改為CMS 策略,後面準備嘗試G1策略;

3)  自定義mysql驅動導致讀寫資料失敗問題

我們對mysql驅動進行了封裝,為了保證內部資料安全。在1.4.0的使用方式如下:

 val sqlContext = new SQLContext(sc)
DriverManager.registerDriver(new CBTMysqlDriver)
val props = new java.util.TreeMap[String, String]
props.put("url", "jdbc:CBTMysqlDriver://*******")
props.put("dbtable", "mysql.user")//database.tablename
props.put("driver", "CBTMysqlDriver")
val df2: DataFrame = sqlContext.read.format("jdbc").options(props).load()
val list = df2.collect()
list.foreach(x => println(x))
sc.stop()

在1.4.0能夠正常從mysql讀取資料,但是遷移到1.5.2以後出現無法讀寫mysql資料。分析原因以後發現如下程式碼:
/**
 * :: DeveloperApi ::
 * Default mysql dialect to read bit/bitsets correctly.
 */
@DeveloperApi
case object MySQLDialect extends JdbcDialect {
  override def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
  override def getCatalystType(
      sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
    if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) {
      // This could instead be a BinaryType if we'd rather return bit-vectors of up to 64 bits as
      // byte arrays instead of longs.
      md.putLong("binarylong", 1)
      Some(LongType)
    } else if (sqlType == Types.BIT && typeName.equals("TINYINT")) {
      Some(BooleanType)
    } else None
  }

  override def quoteIdentifier(colName: String): String = {
    s"`$colName`"
  }

 在讀取mysql 資料之前,driver需要讀取指定表的schema,在讀取的時候需要選擇相應的驅動,選擇的方法是:

overridedef canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
這樣就導致我們自己封裝的mysql 驅動無法找到。

解決方法是實現自己的mysql驅動Dialect類:

class MySQLDialect extends JdbcDialect {
  override def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
  override def getCatalystType(
                                sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
    if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) {
      // This could instead be a BinaryType if we'd rather return bit-vectors of up to 64 bits as
      // byte arrays instead of longs.
      md.putLong("binarylong", 1)
      Some(LongType)
    } else if (sqlType == Types.BIT && typeName.equals("TINYINT")) {
      Some(BooleanType)
    } else None
  }
  override def quoteIdentifier(colName: String): String = {
    s"`$colName`"
  }
}
case object CBTMySQLDialect extends MySQLDialect{
  override  def canHandle(url : String): Boolean = url.startsWith("jdbc:CBTMysqlDriver")
}

4)  hive table name命名不規範

個別業務的hivetable名稱中間存在點號,比如mydatabase.my.table;在spark 1.5.2程式碼裡面對此進行了強制檢查,具體程式碼如下:
/**
   * It is not allowed to specifiy database name for tables stored in [[SimpleCatalog]].
   * We use this method to check it.
   */
  protected def checkTableIdentifier(tableIdentifier: Seq[String]): Unit = {
    if (tableIdentifier.length > 1) {
      throw new AnalysisException("Specifying database name or other qualifiers are not allowed " +
        "for temporary tables. If the table name has dots (.) in it, please quote the " +
        "table name with backticks (`).")
    }
  }
}

命令規範問題需要重視。


5)  hive_metastore ConnectionPassword加密

 spark 連線hive metastore需要使用hive-site.xml,該配置檔案給使用者暴露了連線的metastore的使用者名稱和密碼,這樣會導致兩個問題:

1)   瞭解spark的使用者能夠獲取到metastore 資料庫的密碼;

2)   任意使用者在獲取hive-site.xml後使用我們規定版本外的spark jar包提交spark任務到叢集;

   基於上面兩點,我們在程式碼裡面對metastore 資料庫的密碼的密碼加密;通過閱讀下面原始碼:

protected[hive] lazy val metadataHive: ClientInterface = {
val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)

// We instantiate a HiveConf here to read in the hive-site.xml file and then pass the options
// into the isolated client loader
val metadataConf = new HiveConf()
val defaultWarehouseLocation = metadataConf.get("hive.metastore.warehouse.dir")
logInfo("default warehouse location is " + defaultWarehouseLocation)

// `configure` goes second to override other settings.
val allConfig = metadataConf.iterator.map(e => e.getKey -> e.getValue).toMap ++ configure
.........
logInfo(
    s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using $jars")
  new IsolatedClientLoader(
    version = metaVersion,
    execJars = jars.toSeq,
    config = allConfig,
    isolationOn = true,
    barrierPrefixes = hiveMetastoreBarrierPrefixes,
    sharedPrefixes = hiveMetastoreSharedPrefixes)
}
isolatedLoader.client
     metadataHive 通過HiveConf()載入系統hive-site.xml,然後將metadataConf傳輸給allConfig變數,會通過IsolatedClientLoader創建於metastore連線的state變數,只需要獲取metadataConf裡面的HiveConf.ConfVars.METASTOREPWD.varname變數,然後對其解密,再改變HiveConf()HiveConf.ConfVars.METASTOREPWD.varname即可。加密程式碼如下:
protected[hive] lazy val metadataHive: ClientInterface = {
  val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)

  // We instantiate a HiveConf here to read in the hive-site.xml file and then pass the options
  // into the isolated client loader
  val metadataConf = new HiveConf()
  // added by Ricky
  val passwd=metadataConf.get(HiveConf.ConfVars.METASTOREPWD.varname)
  val passWord = PasswdDecrypt(passwd.toString)//加密模組,自己選擇加密演算法。
  metadataConf.set(HiveConf.ConfVars.METASTOREPWD.varname,passWord)
  hiveconf.set(HiveConf.ConfVars.METASTOREPWD.varname,passWord)//重置全域性密碼
  // end by Ricky
  val defaultWarehouseLocation = metadataConf.get("hive.metastore.warehouse.dir")
  logInfo("default warehouse location is " + defaultWarehouseLocation)

6)  spark sql 不支援hive表的讀許可權控制

 sparksql 對所有hive表都有讀許可權,目前社群也遇到相似問題;

    Intel大神提交了一個patch(https://issues.apache.org/jira/browse/SPARK-8321)目前還在討論合併到社群事宜。

    該patch的解決思路:在執行計劃中新增一個authorized模組,採用hive的AuthorizerV2認證機制對當前的logicalPlan進行認證;目前我們採用的是AuthorizerV1認證方式,直接採用該patch需要升級Authorizer方式。

    我們提出的短期的解決方式是:在parsesql模組,直接呼叫AuthorizerV1方式對select 語句進行許可權檢查,這樣的缺點需要生成hive的logicalPlan去進行許可權檢查,目前還在測試.

7)  Dynamic Resource Allocation報錯

測試下面程式碼:

sc.parallelize(1 to 2000000000,20).map(x=>x%3-0.1).reduce(_+_)
sc.parallelize(1 to 2000000000,40).map(x=>x%3-0.1).reduce(_+_)
sc.parallelize(1 to 2000000000,80).map(x=>x%3-0.1).reduce(_+_)

當executor退出的時候driverstderror出現了下面的錯誤:

6/02/17 17:48:32 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:50558] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
16/02/17 17:48:33 ERROR YarnScheduler: Lost executor 4 on namenode1-sit.cnsuning.com: remote Rpc client disassociated
16/02/17 17:48:33 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:56181] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
16/02/17 17:48:33 ERROR YarnScheduler: Lost executor 1 on namenode1-sit.cnsuning.com: remote Rpc client disassociated
16/02/17 17:48:33 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:39840] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
16/02/17 17:48:34 ERROR YarnScheduler: Lost executor 5 on namenode2-sit.cnsuning.com: remote Rpc client disassociated
16/02/17 17:48:34 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:33914] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
16/02/17 17:48:34 ERROR YarnScheduler: Lost executor 3 on slave01-sit.cnsuning.com: remote Rpc client disassociated
16/02/17 17:48:34 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:52934] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
16/02/17 17:48:34 ERROR YarnScheduler: Lost executor 8 on namenode1-sit.cnsuning.com: remote Rpc client disassociated
16/02/17 17:48:34 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:55408] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
16/02/17 17:48:37 ERROR YarnScheduler: Lost executor 7 on slave01-sit.cnsuning.com: remote Rpc client disassociated
16/02/17 17:48:37 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:39890] has failed, address is now gated for [5000] ms. Reason: [Disassociated]

 目前還在研究該錯誤,本次升級放棄DynamicResource Allocation 功能。

8)  使用spark.cores.max 打散資料

val maxCores=conf.get("spark.executor.instances").toInt
if(maxCores > 0){
conf.set("spark.default.parallelism",(3*maxCores).toString)
conf.set("spark.sql.shuffle.partitions",(3*maxCores).toString)
}
不要使用,yarn 模式下面spark.cores.max不生效
val maxCores=conf.get("spark.cores.max").toInt
if(maxCores > 0){
  conf.set("spark.default.parallelism",(3*maxCores).toString)
  conf.set("spark.sql.shuffle.partitions",(3*maxCores).toString)
}