1. 程式人生 > >Maven打包Java版的spark程式到jar包,本地Windows測試,上傳到叢集中執行

Maven打包Java版的spark程式到jar包,本地Windows測試,上傳到叢集中執行

作者:翁鬆秀


Maven打包Java版的spark程式到jar包,本地Windows測試,上傳到叢集中執行

文章目錄


學習spark的路漫漫啊~前面搭建好了本地環境,用eclipse跑了幾個spark自帶的程式,現在想用maven將程式打包成jar包,然後在本地測試,再上傳到伺服器叢集中提交。
Windows本地搭建Spark開發環境

路漫漫其修遠兮,吾將上下而求索。

Step1:Maven打包Jar包

windows本地測試版JavaWordCount程式碼:

package code.demo.spark;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
public final class JavaWordCount {
	private static final Pattern SPACE = Pattern.compile(" ");
	public static void main(String[] args) throws Exception {
	
		//將hadoop路徑對映為本地的hadoop路徑
		
		System.setProperty("hadoop.home.dir", "F:\\home\\hadoop");
		SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
		JavaSparkContext ctx = new JavaSparkContext(sparkConf);
		
		//從本地磁碟中讀取待統計的檔案
		
		JavaRDD<String> lines = ctx.textFile("F:/home/spark/README.md");
		JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
			private static final long serialVersionUID = 1L;
			
			@Override
			public Iterable<String> call(String s) {
				return Arrays.asList(SPACE.split(s));
			}
		});
		JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {

			private static final long serialVersionUID = 1L;

			@Override
			public Tuple2<String, Integer> call(String s) {
				return new Tuple2<String, Integer>(s, 1);
			}
		});
		JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
			private static final long serialVersionUID = 1L;

			@Override
			public Integer call(Integer i1, Integer i2) {
				return i1 + i2;
			}
		});

		List<Tuple2<String, Integer>> output = counts.collect();
		for (Tuple2<?, ?> tuple : output) {
			System.out.println(tuple._1() + ": " + tuple._2());
		}
		ctx.stop();
		ctx.close();
	}
}

win + R 開啟執行視窗輸入cmd開啟命令列視窗,切換到程式所在目錄,我的目錄是:

E:\code\JavaWorkspace\SparkDemo

編譯程式

mvn compile

這裡寫圖片描述
打包程式

mvn package

這裡寫圖片描述
如果程式沒有特殊情況,比如缺胳膊少腿,打包完後回看到“BUILD SUCCESS”,說明打包成功,這時在maven工程的target資料夾中就可以看到打包好的jar包。
這裡寫圖片描述

Step2:本地測試spark程式

開啟命令列視窗進入spark安裝目錄的bin目錄底下,我的spark安裝路徑是

F:\home\spark-1.6.3-bin-hadoop2.6

為了提交方便,我把打包好的jar包放到了F盤的根目錄

F:\ExampleSpark-1.0-SNAPSHOT.jar

然後執行命令

spark-submit --class code.demo.spark.JavaWordCount --master local F:\\ExampleSpark-1.0-SNAPSHOT.jar

這裡寫圖片描述

Step3:Jar包上傳到叢集

準備工作:因為WordCount程式需要讀取檔案,為了方便起見,所以我們將程式中要統計的檔案word.txt上傳到HDFS
命令格式:hadoop fs -put 本地路徑 HDFS路徑
命令如下:

hadoop fs -put /home/hmaster/word.txt hdfs://hadoop-mn01:9000/user/hmaster/word.txt

檢視是否上傳成功:

hadoop fs -ls hdfs://hadoop-mn01:9000/user/hmaster

如果看到word.txt說明成功,沒有則失敗。上傳失敗的原因可能hdfs上面路徑不存在,比如說hmaster資料夾不存在,在hdfs的user目錄下建立hmaster資料夾:

hadoop fs -mkdir /user/hmaster

叢集版程式碼:直接從HDFS讀取要統計單詞的檔案word.txt


package code.demo.spark;

import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

public final class JavaWordCount {
	private static final Pattern SPACE = Pattern.compile(" ");
	public static void main(String[] args) throws Exception {
		SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
		JavaSparkContext ctx = new JavaSparkContext(sparkConf);
		
		//從hdfs中讀取word.txt進行單詞統計
		
		JavaRDD<String> lines = ctx.textFile("hdfs://hadoop-mn01:9000/user/hmaster/word.txt");
		JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
			private static final long serialVersionUID = 1L;
			@Override
			public Iterable<String> call(String s) {
				return Arrays.asList(SPACE.split(s));
			}
		});
		JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
			private static final long serialVersionUID = 1L;
			@Override
			public Tuple2<String, Integer> call(String s) {
				return new Tuple2<String, Integer>(s, 1);
			}
		});
		JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
			private static final long serialVersionUID = 1L;
			@Override
			public Integer call(Integer i1, Integer i2) {
				return i1 + i2;
			}
		});
		List<Tuple2<String, Integer>> output = counts.collect();
		for (Tuple2<?, ?> tuple : output) {
			System.out.println(tuple._1() + ": " + tuple._2());
		}
		ctx.stop();
		ctx.close();
	}
}

從本地windows上傳jar包到linux伺服器的叢集裡,我用的是WinSCP,除了WinSCP還有其他的方法,不一一例舉。
這裡寫圖片描述

Step4:叢集上提交Jar包

開啟spark主節點伺服器的終端,進入spark的安裝目錄,執行提交命令:

.bin/spark-submit --class code.demo.spark.JavaWordCount --master spark://hadoop-mn01:7077 /home/hmaster/WordCount.jar

命令解釋:
.bin/spark-submit :提交命令,提交應用程式
–class code.demo.spark.JavaWordCount:應用程式的主類
–master spark://hadoop-mn01:7077 :執行的master,跟本地測試local不一樣
/home/hmaster/WordCount.jar:jar包所在路徑
提交之後程式執行完後就能看到統計的結果:
這裡寫圖片描述
Hello WordCount!