好程式設計師大資料教程:SparkShell和IDEA中編寫Spark程式
好程式設計師大資料教程:SparkShell和IDEA中編寫Spark程式,spark-shell是Spark自帶的互動式Shell程式,方便使用者進行互動式程式設計,使用者可以在該命令列下用Scala編寫Spark程式。spark-shell程式一般用作Spark程式測試練習來用。spark-shell屬於Spark的特殊應用程式,我們可以在這個特殊的應用程式中提交應用程式
spark-shell啟動有兩種模式,local模式和cluster模式,分別為
local模式:
spark-shell
local模式僅在本機啟動一個SparkSubmit程序,沒有與叢集建立聯絡,雖然程序中有SparkSubmit但是不會被提交到叢集紅
Cluster模式(叢集模式):
spark-shell \
--master spark://hadoop01:7077 \
--executor-memory 512m \
--total-executor-cores 1
後兩個命令不是必須的 --master這條命令是必須的(除非在jar包中已經指可以不指定,不然就必須指定)
退出shell
千萬不要ctrl+c spark-shell 正確退出 :quit 千萬不要ctrl+c退出 這樣是錯誤的 若使用了ctrl+c退出 使用命令檢視監聽埠 netstat - apn | grep 4040 在使用kill -9 埠號 殺死即可
3.25.11 spark2.2shell和spark1.6shell對比
ps:啟動spark-shell若是叢集模式,在webUI會有一個一直執行的任務
通過IDEA建立Spark工程
ps:工程建立之前步驟省略,在scala中已經講解,直接預設是建立好工程的
對工程中的pom.xml檔案配置
<!-- 宣告公有的屬性 -->
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
<hadoop.version>2.7.1</hadoop.version>
<scala.compat.version>2.11</scala.compat.version>
</properties>
<!-- 宣告並引入公有的依賴 -->
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
Spark實現WordCount程式
Scala版本
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("dri/wordcount").setMaster("local[*]")
//建立sparkContext物件
val sc = new SparkContext(conf)
//通過sparkcontext物件就可以處理資料
//讀取檔案 引數是一個String型別的字串 傳入的是路徑
val lines: RDD[String] = sc.textFile(“dir/wordcount”)
//切分資料
val words: RDD[String] = lines.flatMap(_.split(" "))
//將每一個單詞生成元組 (單詞,1)
val tuples: RDD[(String, Int)] = words.map((_,1))
//spark中提供一個運算元 reduceByKey 相同key 為一組進行求和 計算value
val sumed: RDD[(String, Int)] = tuples.reduceByKey(_+_)
//對當前這個結果進行排序 sortBy 和scala中sotrBy是不一樣的 多了一個引數
//預設是升序 false就是降序
val sorted: RDD[(String, Int)] = sumed.sortBy(_._2,false)
//將資料提交到叢集儲存 無法返回值
sorted.foreach(println)
//回收資源停止sc,結束任務
sc.stop()
}
}
Java版本
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class JavaWordCount {
public static void main(String[] args) {
//1.先建立conf物件進行配置主要是設定名稱,為了設定執行模式
SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
//2.建立context物件
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> lines = jsc.textFile("dir/file");
//進行切分資料 flatMapFunction是具體實現類
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
List<String> splited = Arrays.asList(s.split(" "));
return splited.iterator();
}
});
//將資料生成元組
//第一個泛型是輸入的資料型別 後兩個引數是輸出引數元組的資料
JavaPairRDD<String, Integer> tuples = words.mapToPair(new PairFunction<String, String,
Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
});
//聚合
JavaPairRDD<String, Integer> sumed = tuples.reduceByKey(new Function2<Integer, Integer,
Integer>() {
@Override
//第一個Integer是相同key對應的value
//第二個Integer是相同key 對應的value
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
//因為Java api沒有提供sortBy運算元,此時需要將元組中的資料進行位置調換,然後在排序,排完序在換回
//第一次交換是為了排序
JavaPairRDD<Integer, String> swaped = sumed.mapToPair(new PairFunction<Tuple2<String,
Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> tup) throws Exception {
return tup.swap();
}
});
//排序
JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
//第二次交換是為了最終結果 <單詞,數量>
JavaPairRDD<String, Integer> res = sorted.mapToPair(new PairFunction<Tuple2<Integer,
String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple2) throws Exception
{
return tuple2.swap();
}
});
System.out.println(res.collect());
res.saveAsTextFile("out1");
jsc.stop();
&nbs