1. 程式人生 > >spark java 示例代碼wordcount

spark java 示例代碼wordcount

pairs form onf 字符串拆分 apt tex sco 初始化 weibo

import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; /** * 使用Java的方式開發進行本地測試Spark的WordCount程序 * * @author DT大數據夢工廠 http://weibo.com/ilovepains */ public class WordCountSpk { public static void main(String[] args) {
/** * 第1步:創建Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息, * 例如說通過setMaster來設置程序要鏈接的Spark集群的Master的URL,如果設置 * 為local,則代表Spark程序在本地運行,特別適合於機器配置條件非常差(例如 只有1G的內存)的初學者 * */ SparkConf conf = new SparkConf().setAppName("Spark WordCount written by Java").setMaster("local"); /** * 第2步:創建SparkContext對象 * SparkContext是Spark程序所有功能的唯一入口,無論是采用Scala、Java、Python * 、R等都必須有一個SparkContext(不同的語言具體的類名稱不同,如果是Java的話則為JavaSparkContext) * SparkContext核心作用:初始化Spark應用程序運行所需要的核心組件,包括DAGScheduler、TaskScheduler、 * SchedulerBackend 同時還會負責Spark程序往Master註冊程序等 * SparkContext是整個Spark應用程序中最為至關重要的一個對象
*/ JavaSparkContext sc = new JavaSparkContext(conf); // 其底層實際上就是Scala的SparkContext /** * 第3步:根據具體的數據來源(HDFS、HBase、Local FS、DB、S3等)通過JavaSparkContext來創建JavaRDD * JavaRDD的創建基本有三種方式:根據外部的數據來源(例如HDFS)、根據Scala集合、由其它的RDD操作 * 數據會被RDD劃分成為一系列的Partitions,分配到每個Partition的數據屬於一個Task的處理範疇 * 註意:文件路徑不能直接用Windows路徑中的反斜扛\,要改成Linux下的斜扛/ */ JavaRDD<String> lines = sc .textFile("D:/hu.txt"); /** * 第4步:對初始的JavaRDD進行Transformation級別的處理,例如map、filter等高階函數等的編程,來進行具體的數據計算 * 第4.1步:講每一行的字符串拆分成單個的單詞 */ JavaRDD<String> words = lines .flatMap(new FlatMapFunction<String, String>() { // 如果是Scala,由於SAM轉換,所以可以寫成val // words = // lines.flatMap // { line => // line.split(" ")} public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); /** * 第4步:對初始的JavaRDD進行Transformation級別的處理,例如map、filter等高階函數等的編程,來進行具體的數據計算 * 第4.2步:在單詞拆分的基礎上對每個單詞實例計數為1,也就是word => (word, 1) */ JavaPairRDD<String, Integer> pairs = words .mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); /** * 第4步:對初始的RDD進行Transformation級別的處理,例如map、filter等高階函數等的編程,來進行具體的數據計算 * 第4.3步:在每個單詞實例計數為1基礎之上統計每個單詞在文件中出現的總次數 */ JavaPairRDD<String, Integer> wordsCount = pairs .reduceByKey(new Function2<Integer, Integer, Integer>() { // 對相同的Key,進行Value的累計(包括Local和Reducer級別同時Reduce) public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordsCount.foreach(new VoidFunction<Tuple2<String, Integer>>() { public void call(Tuple2<String, Integer> pairs) throws Exception { System.out.println(pairs._1 + " : " + pairs._2); } }); sc.close(); } }

spark java 示例代碼wordcount