Maven打包Java版的spark程式到jar包,本地Windows測試,上傳到叢集中執行
作者:翁鬆秀
Maven打包Java版的spark程式到jar包,本地Windows測試,上傳到叢集中執行
文章目錄
學習spark的路漫漫啊~前面搭建好了本地環境,用eclipse跑了幾個spark自帶的程式,現在想用maven將程式打包成jar包,然後在本地測試,再上傳到伺服器叢集中提交。
路漫漫其修遠兮,吾將上下而求索。
Step1:Maven打包Jar包
windows本地測試版JavaWordCount程式碼:
package code.demo.spark; import scala.Tuple2; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; public final class JavaWordCount { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws Exception { //將hadoop路徑對映為本地的hadoop路徑 System.setProperty("hadoop.home.dir", "F:\\home\\hadoop"); SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("local"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); //從本地磁碟中讀取待統計的檔案 JavaRDD<String> lines = ctx.textFile("F:/home/spark/README.md"); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String s) { return Arrays.asList(SPACE.split(s)); } }); JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); List<Tuple2<String, Integer>> output = counts.collect(); for (Tuple2<?, ?> tuple : output) { System.out.println(tuple._1() + ": " + tuple._2()); } ctx.stop(); ctx.close(); } }
win + R 開啟執行視窗輸入cmd開啟命令列視窗,切換到程式所在目錄,我的目錄是:
E:\code\JavaWorkspace\SparkDemo
編譯程式
mvn compile
打包程式
mvn package
如果程式沒有特殊情況,比如缺胳膊少腿,打包完後回看到“BUILD SUCCESS”,說明打包成功,這時在maven工程的target資料夾中就可以看到打包好的jar包。
Step2:本地測試spark程式
開啟命令列視窗進入spark安裝目錄的bin目錄底下,我的spark安裝路徑是
F:\home\spark-1.6.3-bin-hadoop2.6
為了提交方便,我把打包好的jar包放到了F盤的根目錄
F:\ExampleSpark-1.0-SNAPSHOT.jar
然後執行命令
spark-submit --class code.demo.spark.JavaWordCount --master local F:\\ExampleSpark-1.0-SNAPSHOT.jar
Step3:Jar包上傳到叢集
準備工作:因為WordCount程式需要讀取檔案,為了方便起見,所以我們將程式中要統計的檔案word.txt上傳到HDFS
命令格式:hadoop fs -put 本地路徑 HDFS路徑
命令如下:
hadoop fs -put /home/hmaster/word.txt hdfs://hadoop-mn01:9000/user/hmaster/word.txt
檢視是否上傳成功:
hadoop fs -ls hdfs://hadoop-mn01:9000/user/hmaster
如果看到word.txt說明成功,沒有則失敗。上傳失敗的原因可能hdfs上面路徑不存在,比如說hmaster資料夾不存在,在hdfs的user目錄下建立hmaster資料夾:
hadoop fs -mkdir /user/hmaster
叢集版程式碼:直接從HDFS讀取要統計單詞的檔案word.txt
package code.demo.spark;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
public final class JavaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
//從hdfs中讀取word.txt進行單詞統計
JavaRDD<String> lines = ctx.textFile("hdfs://hadoop-mn01:9000/user/hmaster/word.txt");
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<String> call(String s) {
return Arrays.asList(SPACE.split(s));
}
});
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?, ?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
ctx.stop();
ctx.close();
}
}
從本地windows上傳jar包到linux伺服器的叢集裡,我用的是WinSCP,除了WinSCP還有其他的方法,不一一例舉。
Step4:叢集上提交Jar包
開啟spark主節點伺服器的終端,進入spark的安裝目錄,執行提交命令:
.bin/spark-submit --class code.demo.spark.JavaWordCount --master spark://hadoop-mn01:7077 /home/hmaster/WordCount.jar
命令解釋:
.bin/spark-submit :提交命令,提交應用程式
–class code.demo.spark.JavaWordCount:應用程式的主類
–master spark://hadoop-mn01:7077 :執行的master,跟本地測試local不一樣
/home/hmaster/WordCount.jar:jar包所在路徑
提交之後程式執行完後就能看到統計的結果:
Hello WordCount!