1. 程式人生 > >從零開始寫一個Spark Structured Streaming程式來統計單詞個數

從零開始寫一個Spark Structured Streaming程式來統計單詞個數

本文將從零開始寫一個Spark Structured Streaming程式來統計單詞的個數。單詞的來源是socket,讀者也可以換成kafka,計算的結果輸出到控制檯,讀者也可以改成輸出到kafka的某個topic。

準備環境:

JDK和Scala安裝,並配置好環境變數JAVA_HOME和SCALA_HOME下面是配置好的環境資訊:

pilafiMac:~ pilaf$ java -version
java version "1.8.0_101"
Java(TM) SE Runtime Environment (build 1.8.0_101-b13)
Java HotSpot(
TM) 64-Bit Server VM (build 25.101-b13, mixed mode) pilafiMac:~ pilaf$ scala -version Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL

Spark下載安裝
去Spark官方網站(http://spark.apache.org/downloads.html)下載
在這裡插入圖片描述
解壓安裝

pilafiMac:opt pilaf$ ls
spark-2.3.2-bin-hadoop2.7.tgz
pilafiMac:opt pilaf$ sudo tar
-xzf spark-2.3.2-bin-hadoop2.7.tgz pilafiMac:opt pilaf$ ls spark-2.3.2-bin-hadoop2.7 spark-2.3.2-bin-hadoop2.7.tgz pilafiMac:opt pilaf$ sudo rm -f spark-2.3.2-bin-hadoop2.7.tgz

檢視安裝好的spark是否可用

pilafiMac:opt pilaf$ ls
spark-2.3.2-bin-hadoop2.7
pilafiMac:opt pilaf$ cd spark-2.3.2-bin-hadoop2.7/
pilafiMac:spark-2.3.2-bin-hadoop2.7 pilaf$ 
pilafiMac:spark-2.3.2-bin-hadoop2.7 pilaf$ ls
LICENSE README.md conf jars python NOTICE RELEASE data kubernetes sbin R bin examples licenses yarn pilafiMac:spark-2.3.2-bin-hadoop2.7 pilaf$ sudo ./bin/spark-shell 2018-10-30 14:08:11 WARN Utils:66 - Your hostname, pilafiMac resolves to a loopback address: 127.0.0.1; using 192.168.1.189 instead (on interface en1) 2018-10-30 14:08:11 WARN Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address 2018-10-30 14:08:15 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://192.168.1.189:4040 Spark context available as 'sc' (master = local[*], app id = local-1540879704574). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.2 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_101) Type in expressions to have them evaluated. Type :help for more information. scala>

可見一切正常。

編寫Spark Structured Streaming程式

在IDEA工具中新建一個Maven工程,其pom.xml檔案如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.pilaf</groupId>
    <artifactId>spark-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <spark.version>2.3.2</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!--maven全量打包外掛,把所有依賴打入一個jar包中-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <configuration>
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

編寫程式碼,只有一個類:

package com.pilaf.demo;

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;

import java.util.Arrays;
import java.util.Iterator;

/**
 * @author pilaf
 * @create: 2018-10-30 10:22
 */
public class SparkDemo {
    public static void main(String[] args) throws Exception{
        SparkSession sparkSession = SparkSession.builder()
                .appName("PilafSparkDemo")
                .getOrCreate();

        sparkSession.sparkContext().setLogLevel("WARN");
        //從socket讀取輸入流,要在本地執行nc -l 9999命令來輸入單詞
        //也可以從kafka中讀取某個topic的內容
        Dataset<Row> lines = sparkSession.readStream()
                .format("socket")
                .option("host", "localhost")
                .option("port","9999")
                .load();

        Dataset<String> words = lines.as(Encoders.STRING())
                .flatMap(new FlatMapFunction<String, String>() {
                    public Iterator<String> call(String s) throws Exception {
                        return Arrays.asList(s.split(" ")).iterator();
                    }
                }, Encoders.STRING());

        //單詞出現次數統計
        Dataset<Row> wordCounts = words.groupBy("value").count();

        StreamingQuery query = wordCounts.writeStream()
                //all the rows in the streaming DataFrame/Dataset will be written to the sink
                // every time these is some updates
                //complete模式:當wordCounts Dataset中有更新的時候,就把所有的行都全量輸出(包括未更新的行)
                .outputMode("complete")
                //輸出到控制檯
                //輸出也可以是kafka
                .format("console")
                .start();

        query.awaitTermination();
    }
}

執行maven clean package命令或者用IDEA右側的Lifecycle中的clean package,然後會在工程的target目錄下生成jar包:

pilafiMac:target pilaf$ pwd
/Users/pilaf/project/sparkdemo/target
pilafiMac:target pilaf$ ls
classes					spark-demo-1.0-SNAPSHOT.jar
maven-archiver				surefire
original-spark-demo-1.0-SNAPSHOT.jar
pilafiMac:target pilaf$ ls -lh
total 254536
drwxr-xr-x  3 pilaf  staff   102B 10 30 14:16 classes
drwxr-xr-x  3 pilaf  staff   102B 10 30 14:16 maven-archiver
-rw-r--r--  1 pilaf  staff   4.2K 10 30 14:16 original-spark-demo-1.0-SNAPSHOT.jar
-rw-r--r--  1 pilaf  staff   124M 10 30 14:17 spark-demo-1.0-SNAPSHOT.jar
drwxr-xr-x  2 pilaf  staff    68B 10 30 14:17 surefire
pilafiMac:target pilaf$ 

可以看到spark-demo-1.0-SNAPSHOT.jar有124MB大小,是全量包。

執行nc -l 9999命令來向socket輸出字串

pilafiMac:~ pilaf$ nc -l 9999
nihao
pilaf
apple
nihao
nihao apple orange
What would you like to eat ?

執行spark程式完成計算輸出

pilafiMac:target pilaf$ ls
classes					original-spark-demo-1.0-SNAPSHOT.jar	surefire
maven-archiver				spark-demo-1.0-SNAPSHOT.jar
pilafiMac:target pilaf$ export SPARK_HOME=/opt/spark-2.3.2-bin-hadoop2.7
pilafiMac:target pilaf$ /opt/spark-2.3.2-bin-hadoop2.7/bin/spark-submit \
> --class com.pilaf.demo.SparkDemo \
> --master local[4] \
> spark-demo-1.0-SNAPSHOT.jar
2018-10-30 15:05:41 WARN  Utils:66 - Your hostname, pilafiMac resolves to a loopback address: 127.0.0.1; using 192.168.1.189 instead (on interface en1)
2018-10-30 15:05:41 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address
2018-10-30 15:05:42 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2018-10-30 15:05:42 INFO  SparkContext:54 - Running Spark version 2.3.2
2018-10-30 15:05:42 INFO  SparkContext:54 - Submitted application: PilafSparkDemo
2018-10-30 15:05:43 INFO  SecurityManager:54 - Changing view acls to: pilaf
2018-10-30 15:05:43 INFO  SecurityManager:54 - Changing modify acls to: pilaf
2018-10-30 15:05:43 INFO  SecurityManager:54 - Changing view acls groups to: 
2018-10-30 15:05:43 INFO  SecurityManager:54 - Changing modify acls groups to: 
2018-10-30 15:05:43 INFO  SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(pilaf); groups with view permissions: Set(); users  with modify permissions: Set(pilaf); groups with modify permissions: Set()
2018-10-30 15:05:43 INFO  Utils:54 - Successfully started service 'sparkDriver' on port 54653.
2018-10-30 15:05:43 INFO  SparkEnv:54 - Registering MapOutputTracker
2018-10-30 15:05:44 INFO  SparkEnv:54 - Registering BlockManagerMaster
2018-10-30 15:05:44 INFO  BlockManagerMasterEndpoint:54 - Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
2018-10-30 15:05:44 INFO  BlockManagerMasterEndpoint:54 - BlockManagerMasterEndpoint up
2018-10-30 15:05:44 INFO  DiskBlockManager:54 - Created local directory at /private/var/folders/nx/8gkdvscn5r1_jlhwshgjr8j40000gp/T/blockmgr-9677604e-639d-4e43-9291-5b065f553c46
2018-10-30 15:05:44 INFO  MemoryStore:54 - MemoryStore started with capacity 366.3 MB
2018-10-30 15:05:44 INFO  SparkEnv:54 - Registering OutputCommitCoordinator
2018-10-30 15:05:44 INFO  log:192 - Logging initialized @5251ms
2018-10-30 15:05:44 INFO  Server:351 - jetty-9.3.z-SNAPSHOT, build timestamp: unknown, git hash: unknown
2018-10-30 15:05:44 INFO  Server:419 - Started @5480ms
2018-10-30 15:05:44 INFO  AbstractConnector:278 - Started [email protected]{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2018-10-30 15:05:44 INFO  Utils:54 - Successfully started service 'SparkUI' on port 4040.
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/jobs,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/jobs/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/jobs/job,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/jobs/job/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/stages,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/stages/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/stages/stage,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/stages/stage/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/stages/pool,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/stages/pool/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/storage,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/storage/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/storage/rdd,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/storage/rdd/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/environment,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/environment/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/executors,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/executors/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/executors/threadDump,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]ec{/executors/threadDump/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/static,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/api,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/jobs/job/kill,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  ContextHandler:781 - Started [email protected]{/stages/stage/kill,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO  SparkUI:54 - Bound SparkUI to 0.0.0.0, and started at http://192.168.1.189:4040
2018-10-30 15:05:45 INFO  SparkContext:54 - Added JAR file:/Users/pilaf/project/sparkdemo/target/spark-demo-1.0-SNAPSHOT.jar at spark://192.168.1.189:54653/jars/spark-demo-1.0-SNAPSHOT.jar with timestamp 1540883145058
2018-10-30 15:05:45 INFO  Executor:54 - Starting executor ID driver on host localhost
2018-10-30 15:05:45 INFO  Utils:54 - Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54658.
2018-10-30 15:05:45 INFO  NettyBlockTransferService:54 - Server created on 192.168.1.189:54658
2018-10-30 15:05:45 INFO  BlockManager:54 - Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
2018-10-30 15:05:45 INFO  BlockManagerMaster:54 - Registering BlockManager BlockManagerId(driver, 192.168.1.189, 54658, None)
2018-10-30 15:05:45 INFO  BlockManagerMasterEndpoint:54 - Registering block manager 192.168.1.189:54658 with 366.3 MB RAM, BlockManagerId(driver, 192.168.1.189, 54658, None)
2018-10-30 15:05:45 INFO  BlockManagerMaster:54 - Registered BlockManager BlockManagerId(driver, 192.168.1.189, 54658, None)
2018-10-30 15:05:45 INFO  BlockManager:54 - Initialized BlockManager: BlockManagerId(driver, 192.168.1.189, 54658, None)
2018-10-30 15:05:45 INFO  ContextHandler:781 - Started [email protected]{/metrics/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:48 WARN  TextSocketSourceProvider:66 - The socket source should not be used for production applications! It does not support recovery.
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|nihao|    1|
+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|apple|    1|
|nihao|    2|
|pilaf|    1|
+-----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|orange|    1|
| apple|    2|
| nihao|    3|
| pilaf|    1|
+------+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|orange|    1|
|   you|    1|
| apple|    2|
|   eat|    1|
| nihao|    3|
|  like|    1|
|  What|    1|
| pilaf|    1|
| would|    1|
|     ?|    1|
|    to|    1|
+------+-----+

參考文章: http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html