1. 程式人生 > >使用Java實現在單機上統計單詞的數目

使用Java實現在單機上統計單詞的數目

準備

首先在IDE(eclipse或者IntelliJ IDEA)上面建立一個Java專案。匯入F:\spark\spark-1.6.0-bin-hadoop2.6\lib\spark-assembly-1.6.0-hadoop2.6.0.jar這個jar包。

然後我們就可以開始寫程式啦!
new一個class,比如取名就叫WordCount吧,生成main函式。
main函式中的程式碼生成步驟如下:

第一步

建立SparkConf 物件,並設定spark應用的一些資訊

  • setAppName 設定應用名稱
  • setMaster 設定Spark應用程式要連線的Spark叢集的master結點的url,如果設定為local
    則表示在本地執行
        SparkConf conf = new SparkConf()
                .setAppName("WordCountApp")
                .setMaster("local");  

第二步

建立Java版本的SparkContext
SparkContext是Spark所有功能的一個入口
在不同型別的Spark應用程式中,使用的SparkContext是不同的:

  • Java:JavaSparkContext
  • scala:SparkContext
        JavaSparkContext sc 
= new JavaSparkContext(conf);

第三步

  • 讀取我們輸入的資料,資料來自資料來源(本地檔案或者hdfs檔案等等),建立一個RDD。
  • 資料來源中的資料會被打亂,然後而被分配到每個RDD的partitaon中去,從而形成一個初始的分散式資料集。
  • 下面程式中的textFile()方法是用於根據資料來源來建立RDD(在Java中,建立的普通RDD被稱為JavaRDD)。
  • 如果是本地檔案或者hdfs檔案,則RDD中的每一個元素相當於檔案裡面的一行。

注:RDD:彈性分散式資料集,是Spark對分散式資料和計算的基本抽象。

JavaRDD<String> lines = sc.textFile("E:/pythonProject/sparktest.txt"
);//這個來自檔案

第四步

將每行切分為單詞

        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));  
            }

        });

第五步

轉化為鍵值對並計算

  • 對映為(單詞,1)這種形式,這樣在後面才能根據單詞作為key,來計算每個單詞出現的次數。
  • mapToPair()方法:將每個元素轉化為Tuple2型別的形式。
  • 下面程式中JavaPairRDD<String, Integer>中的引數是Tuple2的鍵值對型別。
  • 下面程式中的PairFunction<String, String, Integer>()中的引數分別代表:輸入引數,Tuple2的鍵值對型別。
        JavaPairRDD<String, Integer> pairs = words.mapToPair(

                new PairFunction<String, String, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(String word) throws Exception {
                        return new Tuple2<String, Integer>(word, 1);
                    }

                });

第六步

統計每個單詞出現的次數

  • reduceByKey():通過key的值來減少JavaPairRDD<String, Integer>中元素的個數。
  • call():對相同key值的元素的value進行操作。如下面的程式return v1 + v2;就是將value的值相加,從而實現對單詞的計數。
        JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(

                new Function2<Integer, Integer, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1 + v2;
                    }

                });

第七部

輸出統計結果

  • 之前我們進行的flatMapmapToPairreduceByKey操作被稱為transformation操作。
  • 在一個Spark應用程式中,僅僅有transformation操作是不夠的,還需要有action操作來觸發程式的執行。
  • 下面程式的foreach方法就是一種action操作,來觸發程式執行。
        wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {

            private static final long serialVersionUID = 1L;

            @Override
            public void call(Tuple2<String, Integer> wordCount) throws Exception {
                System.out.println("單詞  "+wordCount._1 + "出現  " + wordCount._2 + " 次。");     
            }

        });

第八步

關閉JavaSparkContext

sc.close();

總結

以上就是一個用Java實現的Spark應用程式,用來統計一個檔案中每個單詞出現的次數的程式了!