1. 程式人生 > >Spark local/standalone/yarn/遠端除錯-執行WordCount

Spark local/standalone/yarn/遠端除錯-執行WordCount

local

直接啟動spark-shell

./spark-shell --master local[*]

編寫scala程式碼

sc.textFile("/input/file01.txt")
res0.cache()
res0.count
val wd=res0.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
wd.collect.foreach(println)

在輸出日誌中檢視WordCount結果

在spark監控頁面檢視看,綠色的圓圈代表快取在記憶體中
這裡寫圖片描述
這裡寫圖片描述
這裡寫圖片描述

standalone

在spark-env.sh中新增

export
SPARK_MASTER_IP master export SPARK_MASTER_PORT 7077 export SPARK_WORKER_CORES=2 export SPARK_WORK_INSTANCES=1 export SPARK_WORKER_MEMORY=3g export JAVA_HOME=/usr/local/java/jdk1.7.0_75

在slaves中新增

master
slave1
slave2

啟動spark叢集,在sbin目錄下

./start-all.sh

檢視7077埠已經啟動
這裡寫圖片描述

然後啟動spark-shell編寫之前的程式碼檢視處理過程,可以多寫幾句看看效果,例如
這裡寫圖片描述

這裡寫圖片描述

yarn

使用yarn-client模式提交jar包到yarn

./spark-submit --master yarn-client /usr/local/spark/examples/spark1-1.0-SNAPSHOT.jar 

yarn監控頁面檢視
這裡寫圖片描述

使用debug監聽埠的方式除錯程式

當資料量很大時,用這種方式在叢集中執行程式,在本地設定斷點進行debug
run.sh

/usr/local/spark/bin/spark-submit \
--master yarn-client \
--driver-cores 8 \
--driver-memory 1G \
--num-executors 2
\
--executor-memory 1G \ --executor-cores 4 \ --driver-java-options '-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=9887' \ /usr/local/spark/examples/spark1-1.0-SNAPSHOT.jar

執行run.sh

這裡寫圖片描述

在idea中新增remote設定debug主機名和埠號(run.sh中配置的,如上9887),啟動debug

這裡寫圖片描述
這裡寫圖片描述

spark上wordcount的java實現

SparkUtil工具類提供獲取JavaSparkContext與外部儲存方式讀取RDD,檔案讀取

public class SparkUtil {
    /**
     * 獲取JavaSparkContext
     */
    public static JavaSparkContext getJavaSparkContext(String appName, String logLeverl){
        SparkConf conf=new SparkConf().setAppName(appName);
//        conf.setMaster("local[*]");
        SparkContext sc=new SparkContext(conf);//這步是否可以省略
        JavaSparkContext jsc =new JavaSparkContext(sc);
        return jsc;
    }
    /**
     * 外部儲存方式讀取RDD,檔案讀取
     */
    public static JavaRDD getRddExternal(JavaSparkContext jsc ,String filePath){
        if (null==jsc)
            return null;
        return jsc.textFile(filePath);
    }
}

建立JavaSparkContext,對Rdd進行操作實現word count

public class WordCount {

    private static org.slf4j.Logger logger = LoggerFactory.getLogger(WordCount.class);

    public static void main(String[] args){
        JavaSparkContext jsc =SparkUtil.getJavaSparkContext("WordCount","WARN");
        JavaRDD<String> wordData=SparkUtil.getRddExternal(jsc,"/input/file01.txt");
        wordCount(wordData);
    }

    public static void wordCount(JavaRDD wordData){
        JavaRDD<String> wordRdd=wordData.flatMap(new FlatMapFunction<String,String>() {
            public Iterable call(String s) throws Exception {
                return Arrays.asList(s.split(" "));
            }
        });

        JavaPairRDD<String,Integer> wordMapToPair=wordRdd.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s,1);
            }
        });

        JavaPairRDD<String ,Integer> wordReduceByKey=wordMapToPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer integer1, Integer integer2) throws Exception {
                return integer1.intValue()+integer1.intValue();
            }
        });

        wordReduceByKey.sortByKey().foreach(new VoidFunction<Tuple2<String, Integer>>() {
            public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                System.out.println(stringIntegerTuple2._1+"="+stringIntegerTuple2._2);
                logger.info(stringIntegerTuple2._1+"="+stringIntegerTuple2._2);
            }
        });
    }
}