1. 程式人生 > >spark配置和word-count

spark配置和word-count

pack lib tuple www. sch creat java clust name

Spark
------------
    快如閃電集群計算引擎。
    應用於大規模數據處理快速通用引擎。
    內存計算。
    

    [Speed]
    計算速度是hadoop的100x.
    Spark有高級DAG(Direct acycle graph,有向無環圖)執行引擎。

    [易於使用]
    使用java,scala,python,R,SQL編寫App。
    提供了80+高級算子,能夠輕松構建並行應用。
    也可以使用scala,python,r的shell進行交互式操作

    [通用性]
    對SQL,流計算,復雜分析進行組合應用。
    spark提供了類庫棧,包括SQL,MLlib,graphx,Spark streaming.

    [架構]
    Spark core
    spark SQL
    spark streaming
    spark mllib
    spark graphx

    [到處運行]
    spark可以運行在hadoop,mesos,standalone,clound.
    可以訪問多種數據源,hdfs,hbase,hive,Cassandra, S3.

    
spark集群部署模式
------------------ 1.local 2.standalone 3.mesos 4.yarn 安裝spark[local模式] ---------------- 1.下載spark-2.1.0-bin-hadoop2.7.tgz 2.解壓 3.配置環境變量 [/etc/profile] ... export SPARK_HOME=/soft export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
4.source source /etc/profile 5.進入spark-shell $>spark/bin/spark-shell $scaka>1 + 1 RDD ---------------- resilient distributed dataset ,彈性分布式數據集。 等價於java中的集合比如list. 實現word count ----------------- 1.分布實現 //1.加載文件 scala>val rdd1 = sc.textFile("/homec/centos/1.txt")
//2.壓扁每行 scala>val rdd2 = rdd1.flatMap(_.split(" ")) //3.標1成對 scala>val rdd3 = rdd2.map(w=>(w,1)) //4.按照key聚合每個key下的所有值 scala>val rdd4 = rdd3.reduceByKey(_+_) //5.顯式數據 scala>rdd4.collect() 2.一步實現 $scala>sc.textFile("file:///home/centos/1.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect 3.氣溫值最大值聚合(分布完成) //1.加載文件 scala>val rdd1 = sc.textFile("/home/centos/temp.dat") //2.加載文件 scala>val rdd2 = rdd1.map(line=>{ val arr = line.split(" ") ; (arr(0).toInt,arr(1).toInt) }) //3.按key聚合取出最大值 scala>val rdd3 = rdd2.reduceByKey((a,b)=> if(a >b) a else b) //4.按年排序 scala>val rdd4 = rdd3.sortByKey() //5.顯式 scala>rdd4.collect() idea下編寫spark程序 ------------------- 1.創建java項目,選擇scala類庫 2.添加maven支持,引入依賴 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.0</version> </dependency> </dependencies> </project> 3.編程 [scala版] import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2018/5/8. */ object WCAppScala { def main(args: Array[String]): Unit = { //1.創建spark配置對象 val conf = new SparkConf() conf.setAppName("wcApp") conf.setMaster("local") //2.創建spark上下文件對象 val sc = new SparkContext(conf) //3.加載文件 val rdd1 = sc.textFile("d:/mr/1.txt") //4.壓扁 val rdd2 = rdd1.flatMap(_.split(" ")) //5.標1成對 val rdd3 = rdd2.map(w => (w,1)) //6.化簡 val rdd4 = rdd3.reduceByKey(_ + _) //收集數據 val arr = rdd4.collect() arr.foreach(println) // } } [java版] package com.oldboy.spark.java; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; import java.util.List; /** * */ public class WCAppJava { public static void main(String[] args) { //1.創建配置對象 SparkConf conf = new SparkConf() ; conf.setAppName("wcApp") ; conf.setMaster("local") ; //2.創建java版的上下文 JavaSparkContext sc = new JavaSparkContext(conf) ; //3.加載文件 JavaRDD<String> rdd1 = sc.textFile("d:/mr/1.txt"); //4.壓扁 JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() { public Iterator<String> call(String s) throws Exception { String[] arr = s.split(" "); return Arrays.asList(arr).iterator(); } }) ; //5.標一成對 JavaPairRDD<String,Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s , 1); } }) ; //6.化簡 JavaPairRDD<String,Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }) ; //7.收集 List<Tuple2<String,Integer>> list = rdd4.collect(); for(Tuple2<String,Integer> t : list){ System.out.println(t._1() + " : " + t._2); } } } 練習 -------------- 1.最高氣溫,最低氣溫一次聚合得出 2.最高氣溫,最低氣溫、平均氣溫一次聚合得出 package com.oldboy.spark.java; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import scala.Tuple4; import java.util.List; /** * 統計氣溫數據 */ public class TempAggJava { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("tempAggJava"); conf.setMaster("local") ; JavaSparkContext sc = new JavaSparkContext(conf); //1.加載文件 JavaRDD<String> rdd1 = sc.textFile("d:/mr/temp.dat"); //2.變換 JavaPairRDD<Integer, Tuple4<Integer, Integer, Double, Integer>> rdd2 = rdd1.mapToPair(new PairFunction<String, Integer, Tuple4<Integer,Integer,Double,Integer>>() { public Tuple2<Integer, Tuple4<Integer, Integer, Double, Integer>> call(String s) throws Exception { String[] arr = s.split(" "); int year = Integer.parseInt(arr[0]) ; int temp = Integer.parseInt(arr[1]) ; return new Tuple2<Integer, Tuple4<Integer, Integer, Double, Integer>>(year, new Tuple4<Integer,Integer,Double,Integer>(temp , temp , new Double(temp) , 1)) ; } }) ; //3.聚合 JavaPairRDD<Integer, Tuple4<Integer, Integer, Double, Integer>> rdd3 = rdd2.reduceByKey( new Function2<Tuple4<Integer, Integer, Double, Integer>, Tuple4<Integer, Integer, Double, Integer>, Tuple4<Integer, Integer, Double, Integer>>() { public Tuple4<Integer, Integer, Double, Integer> call(Tuple4<Integer, Integer, Double, Integer> v1, Tuple4<Integer, Integer, Double, Integer> v2) throws Exception { int max = Math.max(v1._1(),v2._1()) ; int min = Math.min(v1._2(),v2._2()) ; int count = v1._4() + v2._4() ; //計算平均值 double avg = (v1._3() * v1._4() + v2._3() * v2._4()) / count ; return new Tuple4<Integer, Integer, Double, Integer>(max, min, avg, count) ; } }) ; //收集 List<Tuple2<Integer, Tuple4<Integer, Integer, Double, Integer>>> list = rdd3.collect(); for(Tuple2<Integer, Tuple4<Integer, Integer, Double, Integer>> t : list){ System.out.println(t); } } } 3. 查看job webui -------------------- http://192.168.231.101:4040 RDD -------------------- resilient distributed dataset, 彈性分布式數據集。 類似於java中集合. idea下實現spark編程 -------------------- 1.常見模塊 2.添加maven <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.0</version> </dependency> </dependencies> </project> 3.編程 import org.apache.spark.{SparkConf, SparkContext} /** */ object WordCountScala { def main(args: Array[String]): Unit = { //常見spark配置對象 val conf = new SparkConf() conf.setAppName("wcScala") conf.setMaster("local") //創建spark上下文 val sc = new SparkContext(conf) //加載文件 val rdd1 = sc.textFile("file:///d:/1.txt") //壓扁 val rdd2 = rdd1.flatMap(_.split(" ")) //標1成對(word,1) val rdd3 = rdd2.map(e=>(e,1)) //按key聚合 val rdd4 = rdd3.reduceByKey(_+_) val arr = rdd4.collect() for(e <- arr){ println(e) } } } java版實現wc ------------------- import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.rdd.RDD; import scala.Function1; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; import java.util.List; /** * Created by Administrator on 2018/2/27. */ public class WordCountJava { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("wcJava") ; conf.setMaster("local"); //創建spark上下文 JavaSparkContext sc = new JavaSparkContext(conf); //加載文件 JavaRDD<String> rdd1 = sc.textFile("file:///d:/1.txt"); //壓扁 JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() { public Iterator<String> call(String s) throws Exception { String[] arr = s.split(" "); return Arrays.asList(arr).iterator(); } }) ; //標1成對 JavaPairRDD<String,Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s,1); } }) ; //聚合計算 JavaPairRDD<String,Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }) ; // List<Tuple2<String,Integer>> list = rdd4.collect(); for (Tuple2<String,Integer> t : list) { System.out.println(t._1 + " : " + t._2()); } } } 搭建spark集群 ----------------- 1.部署模式 1.local 沒有任何spark進程,使用spark-shell交互終端,使用spark的api運行在jvm中。 調試測試該方式。 2.standalone 獨立模式。 需要啟動spark相應的進程,master + worker. 3.yarn 運行hadoop的yarn之上。 4.mesos - 2.部署spark成standalone 2.1)規劃 s101 ~ s104 s101 //master s102 //worker s103 //worker s104 //worker 2.2)分發s101 spark安裝目錄到所有節點 $>su centos $>xsync.sh /soft/spark* $>xsync.sh /soft/spark $>su root $>xsync.sh /etc/profile 2.3)在spark的conf目錄下創建到hadoop的配置文件的軟連接 xcall.sh "ln -s /soft/hadoop/etc/hadoop/hdfs-site.xml /soft/spark/conf/hdfs-site.xml" xcall.sh "ln -s /soft/hadoop/etc/hadoop/core-site.xml /soft/spark/conf/core-site.xml" 2.4)修改slaves文件 [spark/conf/slaves] s102 s103 s104 2.4‘)配置/spark/conf/spark-env.sh並分發 export JAVA_HOME=/soft/jdk 2.5)先啟動hadoop的hdfs 2.5.1)啟動zk [s101] $>xzk.sh start 2.5.2)啟動hdfs [s101] start-dfs.sh 2.6)啟動spark集群 $>spark/sbin/start-all.sh 2.7)驗證webui http://s101:8080 啟動spark-shell,連接到spark集群,實現wordcount -------------------------------- $>spark-shell --master spark://s101:7077 $scala>sc.textFile("hdfs://mycluster/user/centos/1.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect 使用nc方式,將各節點運行的信息發送到s101進行輸出查看 --------------------------------------------------- 1.在spark-shell中定義函數,發送消息給遠程服務器 def sendInfo(str:String) = { val localIp = java.net.InetAddress.getLocalHost().getHostAddress() val socket = new java.net.Socket("192.168.231.101" , 8888) ; val out = socket.getOutputStream() out.write((localIp + " ==> " + str + "\r\n").getBytes()) out.flush() socket.close() } 2.在s101啟動nc服務器 nc -lk 8888 3.編寫程序 val rdd1 = sc.textFile("hdfs://mycluster/user/centos/1.txt") val rdd2 = rdd1.flatMap(line=>{ sendInfo(" flatMap() : " + line) line.split(" ") }) val rdd3 = rdd2.map(word=>{ sendInfo(" map() : " + word) (word , 1) }) val rdd4 = rdd3.reduceByKey((a,b)=>{ sendInfo(" reduceByKey() : " + a + " & " + b) a + b }) rdd4.collect() 導出程序jar包,丟到spark集群上運行 --------------------------------- 1.修改master地址 conf.setMaster("spark://s101:7077") ... 2.導出jar包 略 3.傳遞jar到centos 4.執行一下命令,實現程序在spark集群上運行 spark-submit --master spark://s101:7077 --class WordCountScala my-spark.jar spark-submit --master spark://s101:7077 --class WordCountJava my-spark.jar 在spark中處理數據傾斜 ------------------------ 1.以local方式啟動spark-shell $>spark-shell --master local[4] 2.wordcount $>sc.textFile("file:///home/centos/1.txt").flatMap(_.split(" ")).map(e=>(e + "_" + scala.util.Random.nextInt(10) ,1)).reduceByKey(_+_).map(t=>(t._1.substring(0,t._1.lastIndexOf("_")),t._2)).reduceByKey(_+_).collect 部署spark程序在集群運行 ------------------------- 1.修改程序代碼,從hdfs加載文件。 conf.setMaster("spark://s101:7077") ; ... sc.textFile("hdfs://mycluster/user/centos/1.txt"); 2.導出程序,生成jar包。 project structure ->artifact -> + -> jar -> 刪除自帶jar包 3.build -> artifacts -> myspark 4.定位到到處目錄,復制jar到centos D:\big10\out\artifacts\myspark_jar 5.在centos上執行spark-submit命令運行程序 [scala版] spark-submit --master spark://s101:7077 --class WCAppScala myspark.jar [java版] spark-submit --master spark://s101:7077 --class com.oldboy.spark.java.WCAppJava myspark.jar spark集群管理 ----------------------- [啟動] start-all.sh //啟動所有spark進程 start-master.sh //啟動master節點 start-slaves.sh //master節點啟動所有worker節點 start-slave.sh spark://s101:7077 //單獨登錄單個worker節點,啟動worker進程 [停止] stop-all.sh //停止所有進程 stop-master.sh //停止master進程 stop-slaves.sh //停止所有worker節點 stop-slave.sh //登錄每個worker節點,停止worker進程

spark配置和word-count