1. 程式人生 > >java 實現 spark Streaming

java 實現 spark Streaming

示例 現在 redis contex 時間 mina main pack ket

1. 創建 maven 工程 只加 spark-streaming 這個包就可以

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.spark</
groupId> <artifactId>sparkStream</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version
> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.1</version> </dependency> <
dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-graphx_2.10</artifactId> <version>1.6.1</version> </dependency> </dependencies> </project>

2. 示例代碼

package com.dt.spark.sparkapps.sparkstreaming;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

public class WordCountOnLine {
    public static <U> void main(String[] args) {
        
        /**
         * 第一步:配置SparkConf,
         *     1. 因為 Spark Streaming 應用程序至少有一條線程用於不斷的循環結束數據,並且至少有一條線程用於處理
         *              接收的數據(否則的話無線程用於處理數據,隨著時間的推移,內存和磁盤都會不堪重負)
         *  2. 對於集群而已,每個 Executor 一般肯定不止一個線程,那對於處理 Spark Streaming應用程序而言,每個 Executor 一般分配多少Core
         *     比較合適?根據我們過去的經驗,5個左右的 Core 是最佳的(一個段子分配為基數 Core 表現最佳,)
         */
//        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("sparkStreaming");
        SparkConf conf = new SparkConf().setMaster("spark://hadoop1:7077").setAppName("sparkStreaming");
        
        /**
         * 第二步:創建 SparkStreamingContext,
         *     1.這個 SparkStreaming 應用程序所有功能的起始點和程序調度的核心
         *         SparkStreamingContext 的構建可以基於 SparkConf參數,也可基於持久化的 SaprkStreamingContext的內容來回復過來
         *         (典型的場景是 Driver 奔潰後重新啟動,由於 Spark Streaming 具有連續 7*24 小時不間斷運行的特征,所有需要在 Driver 重新啟動後繼續上一次的狀態,
         *            此時的狀態恢復需要基於曾經的 Checkpoint)
         *     2.在一個Spark Streaming 應用程序中可以創建若幹個 SaprkStreamingContext對象,使用下一個 SaprkStreamingContext
         *       之前需要把前面正在運行的 SparkStreamingContext 對象關閉掉,由此,我們獲得一個重大的啟發 SparkStreaming框架也只是Spark Core上的一個應用程序而言
         *      只不過 Spark Streaming 框架要運行的話需要Spark工程師寫業務邏輯處理代碼;
         */
        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
        
        /**
         * 第三步: 創建 Spark Streaming 輸入數據來源 input Stream
         *      1.數據輸入來源可以基於 File,HDFS, Flume,Kafka,Socket等;
         *      2.在這裏我們制定數據來源於網絡 Socket端口,Spark Streaming鏈接上改端口並在運行的時候一直監聽該端口的數據(當然該端口服務首先必須存在),並且在後續會根據業務需要不斷的
         *        有數據產生(當然對於Spark Streaming 引用程序的運行而言,有無數據其處理流程都是一樣的)
         *   3.如果經常在每隔 5 秒鐘沒有數據的話不斷的啟動空的 Job 其實是會造成調度資源的浪費,因為彬沒有數據需要發生計算;真實的企業級生產環境的代碼在具體提交 Job 前會判斷是否有數據,如果沒有的話
         *   不再提交 Job;
         */
        
       JavaReceiverInputDStream<String> lines = jsc.socketTextStream("hadoop1", 9999);
       
       /**
        * 第四步:接下來就是 對於 Rdd編程一樣基於 DStream進行編程!!!原因是DStream是RDD產生的模板(或者說類), 在 Saprk Stream發生計算前,其實質是把每個 Batch的DStream的操作翻譯
        *         成為 Rdd 的操作!!!
        */
       
       JavaDStream<String> flatMap = lines.flatMap(new FlatMapFunction<String, String>() {
        public Iterable<String> call(String line) throws Exception {
            String[] split = line.split(" ");
            return Arrays.asList(split);
        }
       });
       
       JavaPairDStream<String, Integer> mapToPair = flatMap.mapToPair(new PairFunction<String, String, Integer>() {
        public Tuple2<String, Integer> call(String word) throws Exception {
            return new Tuple2<String, Integer>(word, 1);
        }
       });
       
       JavaPairDStream<String, Integer> reduceByKey = mapToPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
        public Integer call(Integer v1, Integer v2) throws Exception {
            return v1+v2;
        }
       });
       
       
       /**
        * 此處print並不會直接觸發 job 的執行,因為現在的一切都是在 Spark Streaming 框架的控制之下的,對於 Spark Streaming 而言具體是否觸發真正的 job 運行
        * 是基設置的  Duration 時間間隔觸發
        * 一定要註意的是 Spark Streaming應用程序要想執行具體的Job,對DStream就必須有 output Stream操作
        * output Stream有很多類型的函數觸發,類print,saveAsTextFile,saveAsHadoopFile等,最為重要的一個方法是 foreachRDD,因為Spark Streaming處理的結果一般都會放在 Redis,DB,
        * DashBoard等上面,foreachRDD主要就是用來完成這些功能的,而且可以隨意的自定義具體數據到底放在那裏
        */
       reduceByKey.print();
       
       /**
        * Spark Streaming 執行引擎也就是Driver開始運行,Driver啟動的時候是位於一條新的線程中的,當然其內部有消息接收應用程序本身或者 Executor 中的消息;
        * 
        */
       jsc.start();
       jsc.awaitTermination();
       
    }
}

java 實現 spark Streaming