1. 程式人生 > >大資料之Spark(八)--- Spark閉包處理,Spark的應用的部署模式,Spark叢集的模式,啟動Spark On Yarn模式,Spark的高可用配置

大資料之Spark(八)--- Spark閉包處理,Spark的應用的部署模式,Spark叢集的模式,啟動Spark On Yarn模式,Spark的高可用配置

一、Spark閉包處理
------------------------------------------------------------
    RDD,resilient distributed dataset,彈性(容錯)分散式資料集。
    分割槽列表,function,dep Option(分割槽類, Pair[Key,Value]),首選位置。

    執行job時,spark將rdd打碎變換成tasks,每個task由一個executor執行。執行
    之前,spark會進行task的閉包(closure)計算。閉包是指標對executor可見的
    變數和方法,將他們封裝成一個包,以備在rdd的foreach中進行計算。閉包就是將包序列化,然後傳送給每個
    executor.

    local模式下,所有spark程式執行在同一JVM中,共享物件,counter是可以累加的。
    原因是所有executor指向的是同一個引用。

    cluster模式下,不可以,counter是閉包處理的。每個節點對driver上的counter是
    不可見的。只能看到自己內部序列化的counter副本。


二、Spark的應用的部署模式[客戶端模式和叢集模式]
--------------------------------------------------------------------
    a.spark-submit --class xxx xx.jar --deploy-mode (client | cluster)
        --deploy-mode指定是否部署的driver程式,是在worker節點上還是在client主機上。

    b.[client]
        driver執行在client主機上。client可以不在cluster中。

    c.[cluster]
        driver程式提交給spark cluster的某個worker節點來執行。
        worker是cluster中的一員。
        匯出的jar需要放置到所有worker節點都可見的位置(如hdfs)才可以。

    d.不論哪種方式,rdd的運算都在worker執行

    f.驗證Spark的部署模式
        1)啟動spark叢集

        2)程式設計
            
package com.test.spark.scala;
            import java.net.{InetAddress, Socket}

            import org.apache.spark.{SparkConf, SparkContext}

            /**
              *
              */
            object DeployModeTest {

                def printInfo(str:String): Unit ={
                    val ip = InetAddress.getLocalHost.getHostAddress;
                    val sock = new Socket("192.168.231.205",8888);
                    val out = sock.getOutputStream;
                    out.write((ip + " : " + str + "\r\n").getBytes())
                    out.flush()
                    sock.close();
                }

                def main(args: Array[String]): Unit = {
                    val conf = new SparkConf()
                    conf.setAppName("DeployModeTest")
                    conf.setMaster("spark://s201:7077")
                    val sc = new SparkContext(conf)
                    printInfo("hello world") ;

                    val rdd1 = sc.parallelize(1 to 10,3);
                    val rdd2 = rdd1.map(e=>{
                        printInfo(" map : " + e)
                        e * 2 ;
                    })
                    val rdd3 = rdd2.repartition(2)
                    val rdd4 = rdd3.map(e=>{
                        printInfo(" map2 : " + e)
                        e
                    })

                    val res = rdd4.reduce((a,b)=>{
                        printInfo("reduce : " + a + "," + b)
                        a + b ;
                    })
                    printInfo("driver : " + res + "")
                }
            }
        3)打包
            jar
            對於cluster部署模式,必須要將jar放置到所有worker都能夠看到的地方才可以,例如hdfs。

        4)複製到s100,並分發到所有節點的相同目錄下

        5)提交job到spark叢集
            $s100> spark-submit --class com.test.spark.scala.DeployModeTest --master spark://s100:7077 --deploy-mode client TestSpark-2-1.0-SNAPSHOT.jar

            //上傳jar到hdfs
            $> spark-submit --class com.test.spark.scala.DeployModeTest --master spark://s100:7077 --deploy-mode cluster hdfs://s500:8020/data/spark/TestSpark-2-1.0-SNAPSHOT.jar


三、Spark叢集的模式
-----------------------------------------------------
    1.Spark叢集模式的區別主要是ClusterMaster的什麼
        如果是SparkMaster,就是local或者standalone或者local模式
        如果是MesosMaster,就是Mesos模式
        如果是ResourceManagerMaster,就是Yarn模式

    2.開啟模式
        yarn模式:       --master yarn(yarn-site.xml)
        standalone:     --master spark://s100:7077
        mesos:          --master mesos//xxx:xxx


    2.[local]/[standalone]
        使用SparkMaster程序作為管理節點.

    3.[mesos]
        使用mesos的master作為管理節點。

    4.[yarn]
        a.使用hadoop的ResourceManager作為sparkMaster節點。不用spark的master.

        b.不需要啟動spark-master節點。也不需要。

        c.確保HADOOP_CONF_DIR和YARN_CONF_DIR環境變數指向了包含了hadoop配置檔案的目錄,這些配置檔案可以確保,是向hdfs寫入資料,並且確定是連線到yarn的resourcemanager.
            --> cp core-site.xml hdfs-site.xml yarn-site.xml 到 spark/conf下
            --> 分發到所有節點
            --> 配置HADOOP_CONF_DIR和YARN_CONF_DIR環境變數
                修改/soft/sparl/conf/spark-env.sh
                -----------------------------------
                export HADOOP_CONF_DIR=/soft/hadoop/etc/hadoop
                export SPARK_EXECUTOR_INSTANCES=3
                export SPARK_EXECUTOR_CORES=1
                export SPARK_EXECUTOR_MEMORY=500M
                export SPARK_DRIVER_MEMORY=500M

            ---> 分發到所有節點

        d.這些配置分發到yarn叢集的所有節點,並且確保所有節點的配置是一致的。配置中設定的所有屬性確保所有節點都能找到。

        f.在yarn上執行spark應用,可以採用兩種部署模式。
            a.cluster部署模式:driver執行在Yarn-Appmaster程序中。
            b.client部署模式:driver執行在client程序中,Yarn-AppMaster只用於請求資源。


四、啟動Spark On Yarn模式
-------------------------------------------------------------
    1.修改程式碼並打包
        
package com.test.spark.scala;
        import java.net.{InetAddress, Socket}
        import org.apache.spark.{SparkConf, SparkContext}

        /**
          *
          */
        object DeployModeTest {

            def printInfo(str:String): Unit ={
                val ip = InetAddress.getLocalHost.getHostAddress;
                val sock = new Socket("192.168.231.205",8888);
                val out = sock.getOutputStream;
                out.write((ip + " : " + str + "\r\n").getBytes())
                out.flush()
                sock.close();
            }

            def main(args: Array[String]): Unit = {
                val conf = new SparkConf()
                conf.setAppName("DeployModeTest")
                conf.setMaster("yarn")
                val sc = new SparkContext(conf)
                printInfo("hello world") ;

                val rdd1 = sc.parallelize(1 to 10,3);
                val rdd2 = rdd1.map(e=>{
                    printInfo(" map : " + e)
                    e * 2 ;
                })
                val rdd3 = rdd2.repartition(2)
                val rdd4 = rdd3.map(e=>{
                    printInfo(" map2 : " + e)
                    e
                })

                val res = rdd4.reduce((a,b)=>{
                    printInfo("reduce : " + a + "," + b)
                    a + b ;
                })
                printInfo("driver : " + res + "")
            }
        }

    2.拷貝配置檔案,配置HADOOP_CONF_DIR和YARN_CONF_DIR環境變數
        a.拷貝 core-site.xml hdfs-site.xml yarn-site.xml 到 spark/conf下,並分發到所有節點

        b.修改/soft/spark/conf/spark-env.sh,並分發
            export HADOOP_CONF_DIR=/soft/hadoop/etc/hadoop
            export SPARK_EXECUTOR_INSTANCES=3
            export SPARK_EXECUTOR_CORES=1
            export SPARK_EXECUTOR_MEMORY=500M
            export SPARK_DRIVER_MEMORY=500M

    3.將Spark的jars檔案放到hdfs上[因為Hadoop叢集中預設是沒有spark的jar包的,所以,需要手動put上去,不然每次系統都會將jars打包上傳到臨時目錄]
        a.將/soft/spark/jars資料夾上傳到hdfs上
            $> hdfs dfs -put /soft/spark/jars /data/spark

    4.配置spark屬性檔案,並分發到所有節點
        [/spark/conf/spark-defaults.conf]
        spark.yarn.jars hdfs://mycluster/data/spark/jars/*.jar
        spark.yarn.am.memory=512M
        spark.driver.memory=512M
        spark.executor.memory=512M

    5.在s500上開啟nc
        $> nc -lk 8888

    6.提交作業
        //yarn + cluster
        spark-submit --class com.test.spark.scala.DeployModeTest --master yarn --deploy-mode cluster hdfs://mycluster/data/spark/TestSpark-2-1.0-SNAPSHOT.jar

        //yarn + client
        spark-submit --class com.test.spark.scala.DeployModeTest --master yarn --deploy-mode client TestSpark-2-1.0-SNAPSHOT.jar


    7.如果是yarn模式啟動,是不需要啟動spark叢集的


五、SparkMaster_HA
---------------------------------------------------------------
    1.[描述]
        只針對standalone和mesos叢集部署情況,因為yarn模式已經有HA了
        使用zk連線多個master並存儲state。
        master主要負責排程。

    2.[配置配置檔案並分發到所有節點]
        [spark/conf/spark-env.sh]
        export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=s100:2181,s200:2181,s300:2181 -Dspark.deploy.zookeeper.dir=/spark"
        spark.deploy.recoveryMode=ZOOKEEPER
        spark.deploy.zookeeper.url=s100:2181,s200:2181,s300:2181
        spark.deploy.zookeeper.dir=/spark/ha

    3.[啟動方式]
        a.直接在多個節點上啟動master程序,HA會自動從zk中新增或者刪除Master節點.
            $s100> ./start-all.sh
            $s500> ./start-master.sh

        b.也可通過指定多個master連線地址實現:spark://host1:port1,host2:port2.
            程式碼中使用spark ha
            conf.setMaster("spark://s100:7077,s500:7077")