1. 程式人生 > >idea開發第一個spark程式---統計文字單詞數

idea開發第一個spark程式---統計文字單詞數

在建立專案之前確保自己本地安裝好了scala環境和java環境,因為spark是scala編寫的,scala和java一樣都是需要編譯成位元組碼,然後在JVM裡面執行。我本地的scala版本是2.11.0版本,hadoop是2.7.6版本

第一步:開啟idea,然後建立一個maven專案

在pom裡面加入如下依賴:

   <properties>
        <spark.version>2.3.1</spark.version>
        <scala.version>2.11</scala.version>
        <geotools.version>20-SNAPSHOT</geotools.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_${scala.version}</artifactId>
            <version>1.6.3</version>
        </dependency>
        <dependency>
            <groupId>com.esri.geometry</groupId>
            <artifactId>esri-geometry-api</artifactId>
            <version>1.2.1</version>
        </dependency>
        <dependency>
            <groupId>com.googlecode.json-simple</groupId>
            <artifactId>json-simple</artifactId>
            <version>1.1.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>

            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.19</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>

        </plugins>
    </build>

第二步:在編輯器點選File-->Project Structure 點選下圖的Scala SDK,瀏覽進入本地的scala安裝位置,匯入scala庫

第三步:書寫java版的類:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

/**
 * Created by zhanglu on 2018/9/7.
 */
public class WordCount {
    public static void main(String[] args) {
//        編寫spark程式
//        第一步:建立sparkConf物件,設定spark的配置資訊
        SparkConf sparkConf=new SparkConf();
        sparkConf.setAppName("WordCount");
        sparkConf.setMaster("local");
//        第二步:建立javaSparkContext物件,初始化spark的各種元件
        JavaSparkContext javaSparkContext=new JavaSparkContext(sparkConf);
//        第三步:針對輸入的資料來源(hdfs,本地檔案)建立一個RDD,輸入資料會分配到RDD的各個分割槽上面
//        形成一個初始的分散式資料集。textFile()通過輸入資料的型別建立RDD,檔案裡面的每一行就相當於RDD裡面的每一個元素
        JavaRDD<String> javaRDD=javaSparkContext.textFile("E://個人/word_count.txt");
//        第四步:對初始的RDD進行transformation,就是計算操作
//        將每一行元素拆成單個單詞,通常操作會通過建立一個function,並配合RDD的map.flatMap等運算元來操作
        JavaRDD<String> counts=javaRDD.flatMap(new FlatMapFunction<String, String>() {
            private static final long serialVersionUID=1L;
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split("")).iterator();
            }
        });
//        將每個單詞對映成(單詞,1)的tuple形式
        JavaPairRDD<String,Integer> pairs=counts.mapToPair(new PairFunction<String, String, Integer>() {
            private static final long serialVersionUID=1L;
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String,Integer>(s,1);
            }
        });
//        將上面tuple進行reduce操作,迭代計算
        JavaPairRDD<String,Integer> wordCounts=pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            private static final long serialVersionUID=1L;
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer+integer2;
            }
        });

//               以上flatMap和mapToPair和reduceBykey都是transformation操作,spark程式還需要action操作,程式才可以執行
        wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            private static final long serialVersionUID=1L;
            @Override
            public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                System.out.println(stringIntegerTuple2._1+" appeared "+stringIntegerTuple2._2+" times ");
            }
        });
        javaSparkContext.close();
    }
}