1. 程式人生 > >Spark Streaming實戰對論壇網站動態行為pv,uv,註冊人數,跳出率的多維度分析,實時統計每天pv,uv的sparkStreaming結合redis結果存入mysql供前端展示

Spark Streaming實戰對論壇網站動態行為pv,uv,註冊人數,跳出率的多維度分析,實時統計每天pv,uv的sparkStreaming結合redis結果存入mysql供前端展示

論壇資料執行程式碼自動生成,該生成的資料會作為Producer的方式傳送給Kafka,然後SparkStreaming程式會從Kafka中線上Pull到論壇或者網站的使用者線上行為資訊,進而進行多維度的線上分析 資料格式如下: date:日期,格式為yyyy-MM-dd timestamp:時間戳 userID:使用者ID pageID:頁面ID chanelID:板塊的ID action:點選和註冊

生成的使用者點選模擬資料如下:

product:2017-06-20      1497948113817   1397    91      ML      View product:2017-06-20      1497948113819   149     1941    ML      Register product:2017-06-20      1497948113820   null    335     Spark   Register product:2017-06-20      1497948113821   1724    1038    ML      View product:2017-06-20      1497948113822   282     494     Flink   View product:2017-06-20      1497948113823   null    1619    ML      View product:2017-06-20      1497948113823   991     1950    ML      View product:2017-06-20      1497948113824   686     1347    Kafka   Register product:2017-06-20      1497948113825   1982    1145    Hive    View product:2017-06-20      1497948113826   211     1097    Storm   View product:2017-06-20      1497948113827   633     1345    Hive    View product:2017-06-20      1497948113828   957     1381    Hadoop  Register product:2017-06-20      1497948113831   300     1781    Spark   View product:2017-06-20      1497948113832   1244    1076    Hadoop  Register product:2017-06-20      1497948113833   1958    634     ML      View

生成模擬資料程式碼:

package org.apache.spark.examples.streaming;   import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties; import java.util.Random;   import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig;   /**  * 這裡產生資料,就會發送給kafka,kafka那邊啟動消費者,就會接收到資料,這一步是用來測試生成資料和消費資料沒有問題的,確定沒問題後要關閉消費者,  * 啟動OnlineBBSUserLogss.java的類作為消費者,就會按pv,uv等方式處理這些資料。  * 因為一個topic只能有一個消費者,所以啟動程式前必須關閉kafka方式啟動的消費者(我這裡沒有關閉關閉kafka方式啟動的消費者也沒正常啊)   */ public class SparkStreamingDataManuallyProducerForKafkas extends Thread{          //具體的論壇頻道     static String[] channelNames = new  String[]{         "Spark","Scala","Kafka","Flink","Hadoop","Storm",         "Hive","Impala","HBase","ML"     };     //使用者的兩種行為模式     static String[] actionNames = new String[]{"View", "Register"};     private static Producer<String, String> producerForKafka;     private static String dateToday;     private static Random random;          //2、作為執行緒而言,要複寫run方法,先寫業務邏輯,再寫控制     @Override     public void run() {         int counter = 0;//搞500條         while(true){//模擬實際情況,不斷迴圈,非同步過程,不可能是同步過程            counter++;           String userLog = userlogs();           System.out.println("product:"+userLog);           //"test"為topic           producerForKafka.send(new KeyedMessage<String, String>("test", userLog));           if(0 == counter%500){                 counter = 0;                 try {                    Thread.sleep(1000);                 } catch (InterruptedException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                 }             }         }     }              private static String userlogs() {         StringBuffer userLogBuffer = new StringBuffer("");         int[] unregisteredUsers = new int[]{1, 2, 3, 4, 5, 6, 7, 8};         long timestamp = new Date().getTime();             Long userID = 0L;             long pageID = 0L;             //隨機生成的使用者ID              if(unregisteredUsers[random.nextInt(8)] == 1) {                userID = null;             } else {                userID = (long) random.nextInt((int) 2000);             }             //隨機生成的頁面ID             pageID =  random.nextInt((int) 2000);                       //隨機生成Channel             String channel = channelNames[random.nextInt(10)];             //隨機生成action行為             String action = actionNames[random.nextInt(2)];                          userLogBuffer.append(dateToday)                         .append("\t")                         .append(timestamp)                         .append("\t")                         .append(userID)                         .append("\t")                         .append(pageID)                         .append("\t")                         .append(channel)                         .append("\t")                         .append(action);   //這裡不要加\n換行符,因為kafka自己會換行,再append一個換行符,消費者那邊就會處理不出資料         return userLogBuffer.toString();     }          public static void main(String[] args) throws Exception {       dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date());       random = new Random();         Properties props = new Properties();         props.put("zk.connect", "h71:2181,h72:2181,h73:2181");         props.put("metadata.broker.list","h71:9092,h72:9092,h73:9092");         props.put("serializer.class", "kafka.serializer.StringEncoder");         ProducerConfig config = new ProducerConfig(props);         producerForKafka = new Producer<String, String>(config);         new SparkStreamingDataManuallyProducerForKafkas().start();      } }

pv,uv,註冊人數,跳出率的多維度分析程式碼:

package org.apache.spark.examples.streaming;   import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set;   import kafka.serializer.StringDecoder;   import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils;   import scala.Tuple2;   /*  *消費者消費SparkStreamingDataManuallyProducerForKafka類中邏輯級別產生的資料,這裡是計算pv,uv,註冊人數,跳出率的方式  */ public class OnlineBBSUserLogss {      public static void main(String[] args) {        /**          * 第一步:配置SparkConf:          * 1,至少2條執行緒:因為Spark Streaming應用程式在執行的時候,至少有一條          * 執行緒用於不斷的迴圈接收資料,並且至少有一條執行緒用於處理接受的資料(否則的話無法          * 有執行緒用於處理資料,隨著時間的推移,記憶體和磁碟都會不堪重負);          * 2,對於叢集而言,每個Executor一般肯定不止一個Thread,那對於處理Spark Streaming的          * 應用程式而言,每個Executor一般分配多少Core比較合適?根據我們過去的經驗,5個左右的          * Core是最佳的(一個段子分配為奇數個Core表現最佳,例如3個、5個、7個Core等);          */ //      SparkConf conf = new SparkConf().setMaster("spark://h71:7077").setAppName("OnlineBBSUserLogs");       SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local[2]");       /**          * 第二步:建立SparkStreamingContext:          * 1,這個是SparkStreaming應用程式所有功能的起始點和程式排程的核心          * SparkStreamingContext的構建可以基於SparkConf引數,也可基於持久化的SparkStreamingContext的內容          * 來恢復過來(典型的場景是Driver崩潰後重新啟動,由於Spark Streaming具有連續7*24小時不間斷執行的特徵,          * 所有需要在Driver重新啟動後繼續上次的狀態,此時的狀態恢復需要基於曾經的Checkpoint);          * 2,在一個Spark Streaming應用程式中可以建立若干個SparkStreamingContext物件,使用下一個SparkStreamingContext          * 之前需要把前面正在執行的SparkStreamingContext物件關閉掉,由此,我們獲得一個重大的啟發SparkStreaming框架也只是          * Spark Core上的一個應用程式而已,只不過Spark Streaming框架箱執行的話需要Spark工程師寫業務邏輯處理程式碼;          */       JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));         /**          * 第三步:建立Spark Streaming輸入資料來源input Stream:          * 1,資料輸入來源可以基於File、HDFS、Flume、Kafka、Socket等          * 2, 在這裡我們指定資料來源於網路Socket埠,Spark Streaming連線上該埠並在執行的時候一直監聽該埠          *         的資料(當然該埠服務首先必須存在),並且在後續會根據業務需要不斷的有資料產生(當然對於Spark Streaming          *         應用程式的執行而言,有無資料其處理流程都是一樣的);           * 3,如果經常在每間隔5秒鐘沒有資料的話不斷的啟動空的Job其實是會造成排程資源的浪費,因為並沒有資料需要發生計算,所以          *         例項的企業級生成環境的程式碼在具體提交Job前會判斷是否有資料,如果沒有的話就不再提交Job;          */       Map<String, String> kafkaParameters = new HashMap<String, String>();       kafkaParameters.put("metadata.broker.list","h71:9092,h72:9092,h73:9092");       Set topics = new HashSet<String>();       topics.add("test");       JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream(                 jsc,                 String.class,                 String.class,                 StringDecoder.class,                 StringDecoder.class,                 kafkaParameters,                  topics);       //線上PV計算       onlinePagePV(lines);       //線上UV計算 //      onlineUV(lines);       //線上計算註冊人數 //      onlineRegistered(lines);       //線上計算跳出率 //      onlineJumped(lines);       //線上不同模組的PV //      onlineChannelPV(lines);              /*        * Spark Streaming執行引擎也就是Driver開始執行,Driver啟動的時候是位於一條新的執行緒中的,當然其內部有訊息迴圈體,用於        * 接受應用程式本身或者Executor中的訊息;        */       jsc.start();       jsc.awaitTermination();       jsc.close();    }      private static void onlineChannelPV(JavaPairInputDStream<String, String> lines) {       lines.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {          @Override          public Tuple2<String, Long> call(Tuple2<String,String> t) throws Exception {             String[] logs = t._2.split("\t");             String channelID =logs[4];             return new Tuple2<String,Long>(channelID, 1L);          }       }).reduceByKey(new Function2<Long, Long, Long>() { //對相同的Key,進行Value的累計(包括Local和Reducer級別同時Reduce)          @Override          public Long call(Long v1, Long v2) throws Exception {             return v1 + v2;          }       }).print();    }      private static void onlineJumped(JavaPairInputDStream<String, String> lines) {       lines.filter(new Function<Tuple2<String,String>, Boolean>() {          @Override          public Boolean call(Tuple2<String, String> v1) throws Exception {             String[] logs = v1._2.split("\t");             String action = logs[5];             if("View".equals(action)){                return true;             } else {                return false;             }          }       }).mapToPair(new PairFunction<Tuple2<String,String>, Long, Long>() {          @Override          public Tuple2<Long, Long> call(Tuple2<String,String> t) throws Exception {             String[] logs = t._2.split("\t");          // Long usrID = Long.valueOf(logs[2] != null ? logs[2] : "-1"); 這個有錯             Long usrID = Long.valueOf("null".equals(logs[2])  ? "-1" : logs[2]);             return new Tuple2<Long,Long>(usrID, 1L);          }       }).reduceByKey(new Function2<Long, Long, Long>() { //對相同的Key,進行Value的累計(包括Local和Reducer級別同時Reduce)          @Override          public Long call(Long v1, Long v2) throws Exception {             return v1 + v2;          }       }).filter(new Function<Tuple2<Long,Long>, Boolean>() {          @Override          public Boolean call(Tuple2<Long, Long> v1) throws Exception {             if(1 == v1._2){                return true;             } else {                return false;             }          }       }).count().print();    }      private static void onlineRegistered(JavaPairInputDStream<String, String> lines) {       lines.filter(new Function<Tuple2<String,String>, Boolean>() {          @Override          public Boolean call(Tuple2<String, String> v1) throws Exception {             String[] logs = v1._2.split("\t");             String action = logs[5];             if("Register".equals(action)){                return true;             } else {                return false;             }          }       }).count().print();    }      /**     * 因為要計算UV,所以需要獲得同樣的Page的不同的User,這個時候就需要去重操作,DStreamzhong有distinct嗎?當然沒有(截止到Spark 1.6.1的時候還沒有該Api)     * 此時我們就需要求助於DStream魔術般的方法tranform,在該方法內部直接對RDD進行distinct操作,這樣就是實現了使用者UserID的去重,進而就可以計算出UV了。     * @param lines     */    private static void onlineUV(JavaPairInputDStream<String, String> lines) {       /*        * 第四步:接下來就像對於RDD程式設計一樣基於DStream進行程式設計!!!原因是DStream是RDD產生的模板(或者說類),在Spark Streaming具體        * 發生計算前,其實質是把每個Batch的DStream的操作翻譯成為對RDD的操作!!!        * 對初始的DStream進行Transformation級別的處理,例如map、filter等高階函式等的程式設計,來進行具體的資料計算        */       JavaPairDStream<String, String> logsDStream = lines.filter(new Function<Tuple2<String,String>, Boolean>() {          @Override          public Boolean call(Tuple2<String, String> v1) throws Exception {             String[] logs = v1._2.split("\t");             String action = logs[5];             if("View".equals(action)){                return true;             } else {                return false;             }          }       });              //在單詞拆分的基礎上對每個單詞例項計數為1,也就是word => (word, 1)       logsDStream.map(new Function<Tuple2<String,String>,String>(){          @Override          public String call(Tuple2<String, String> v1) throws Exception {             String[] logs =v1._2.split("\t");             String usrID = String.valueOf(logs[2] != null ? logs[2] : "-1" );             //原文是Long usrID = Long.valueOf(logs[2] != null ? logs[2] : "-1" );             //報錯:java.lang.NumberFormatException: For input string: "null"             Long pageID = Long.valueOf(logs[3]);             return pageID+"_"+usrID;          }       }).transform(new Function<JavaRDD<String>,JavaRDD<String>>(){          @Override          public JavaRDD<String> call(JavaRDD<String> v1) throws Exception {             // TODO Auto-generated method stub             return v1.distinct();          }       }).mapToPair(new PairFunction<String, Long, Long>() {          @Override          public Tuple2<Long, Long> call(String t) throws Exception {             String[] logs = t.split("_");             Long pageId = Long.valueOf(logs[0]);             return new Tuple2<Long,Long>(pageId, 1L);          }       }).reduceByKey(new Function2<Long, Long, Long>() { //對相同的Key,進行Value的累計(包括Local和Reducer級別同時Reduce)          @Override          public Long call(Long v1, Long v2) throws Exception {             return v1 + v2;          }       }).print();    }      private static void onlinePagePV(JavaPairInputDStream<String, String> lines) {       /*        * 第四步:接下來就像對於RDD程式設計一樣基於DStream進行程式設計!!!原因是DStream是RDD產生的模板(或者說類),在Spark Streaming具體        * 發生計算前,其實質是把每個Batch的DStream的操作翻譯成為對RDD的操作!!!        * 對初始的DStream進行Transformation級別的處理,例如map、filter等高階函式等的程式設計,來進行具體的資料計算        */       JavaPairDStream<String, String> logsDStream = lines.filter(new Function<Tuple2<String,String>, Boolean>() {          @Override          public Boolean call(Tuple2<String, String> v1) throws Exception {             String[] logs = v1._2.split("\t");             String action = logs[5];             if("View".equals(action)){                return true;             } else {                return false;             }          }       });              //在單詞拆分的基礎上對每個單詞例項計數為1,也就是word => (word, 1)       JavaPairDStream<Long, Long> pairs = logsDStream.mapToPair(new PairFunction<Tuple2<String,String>, Long, Long>() {          @Override          public Tuple2<Long, Long> call(Tuple2<String, String> t) throws Exception {             String[] logs = t._2.split("\t");             Long pageId = Long.valueOf(logs[3]);             return new Tuple2<Long,Long>(pageId, 1L);          }       });       //在單詞例項計數為1基礎上,統計每個單詞在檔案中出現的總次數       JavaPairDStream<Long, Long> wordsCount = pairs.reduceByKey(new Function2<Long, Long, Long>() { //對相同的Key,進行Value的累計(包括Local和Reducer級別同時Reduce)         //對相同的key,進行Value的累加(包括Local和Reducer級別同時Reduce)          @Override          public Long call(Long v1, Long v2) throws Exception {             return v1 + v2;          }       });              /*        * 此處的print並不會直接出發Job的執行,因為現在的一切都是在Spark Streaming框架的控制之下的,對於Spark Streaming        * 而言具體是否觸發真正的Job執行是基於設定的Duration時間間隔的        *         * 諸位一定要注意的是Spark Streaming應用程式要想執行具體的Job,對Dtream就必須有output Stream操作,        * output Stream有很多型別的函式觸發,類print、saveAsTextFile、saveAsHadoopFiles等,最為重要的一個        * 方法是foraeachRDD,因為Spark Streaming處理的結果一般都會放在Redis、DB、DashBoard等上面,foreachRDD        * 主要就是用用來完成這些功能的,而且可以隨意的自定義具體資料到底放在哪裡!!!        *        * 在企業生產環境下,一般會把計算的資料放入Redis或者DB中,採用J2EE等技術進行趨勢的繪製等,這就像動態更新的股票交易一下來實現        * 線上的監控等;        */       wordsCount.print();    } }

啟動hadoop、spark、zookeeper、kafka叢集(啟動過程就不多言了)這裡把我使用的版本列出: hadoop         hadoop-2.6.0-cdh5.5.2 kafka              kafka_2.10-0.8.2.0 spark             spark-1.3.1-bin-hadoop2.6(後來我又裝了spark-1.6.0-bin-hadoop2.6也行) zookeeper     zookeeper-3.4.5-cdh5.5.2

java                 jdk1.7.0_25

在myeclipse中建立專案:

(這裡我吐槽一下,在myeclipse-8.5和myeclipse-10.7.1版本中只能識別spark-1.3.1-bin-hadoop2.6的jar包卻無法識別spark-1.6.0-bin-hadoop2.6的jar包,雖然用spark-1.3.1-bin-hadoop2.6的jar包也能正常執行不影響什麼,但有強迫症的我咋能忍,無奈我下載了個myeclipse-pro-2014-GA版本(你下載最新的版本應該也可以吧)才這兩個版本spark的jar包都識別,我尼瑪也是醉了。。。)

將該專案打成streaming.jar包上從本地上傳到虛擬機器上,我這裡是上傳到了/home/hadoop/spark-1.3.1-bin-hadoop2.6目錄中

第一步:kafka建立topic

[[email protected] kafka_2.10-0.8.2.0]$ bin/kafka-topics.sh --create --zookeeper h71:2181 --replication-factor 2 --partitions 2 --topic test

(如果不建立該topic的話,也倒無妨,因為你如果先直接執行SparkStreamingDataManuallyProducerForKafkas.java的時候會自動建立topic,如果是先執行的OnlineBBSUserLogss.java的話雖然第一次會報錯:Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.SparkException: Couldn't find leader offsets for Set(),但是它已經為你建立了該topic,再執行的話則不會報錯了,只不過他們建立的該topic都預設分割槽和副本都為1)

第二步:執行SparkStreamingDataManuallyProducerForKafka

[[email protected] spark-1.3.1-bin-hadoop2.6]$ bin/spark-submit --master spark://h71:7077 --name JavaWordCountByHQ --class org.apache.spark.examples.streaming.SparkStreamingDataManuallyProducerForKafkas --executor-memory 500m --total-executor-cores 2 streaming.jar

會報錯:

Exception in thread "main" java.lang.NoClassDefFoundError: kafka/producer/ProducerConfig         at com.spark.study.streaming.SparkStreamingDataManuallyProducerForKafkas.main(SparkStreamingDataManuallyProducerForKafkas.java:102)         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)         at java.lang.reflect.Method.invoke(Method.java:606)         at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)         at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)         at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.producer.ProducerConfig         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)         at java.security.AccessController.doPrivileged(Native Method)         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)         ... 10 more

解決: 第一種方法:

在spark-env.sh中新增如下內容:

[[email protected] spark-1.3.1-bin-hadoop2.6]$ vi conf/spark-env.sh export SPARK_HOME=/home/hadoop/spark-1.3.1-bin-hadoop2.6 export SPARK_CLASSPATH=$SPARK_HOME/lib/*

再執行SparkStreamingDataManuallyProducerForKafka

[[email protected] spark-1.3.1-bin-hadoop2.6]$ bin/spark-submit --master spark://h71:7077 --name JavaWordCountByHQ --class org.apache.spark.examples.streaming.SparkStreamingDataManuallyProducerForKafkas --executor-memory 500m --total-executor-cores 2 streaming.jar

但是這種方法不是很好,因為再執行OnlineBBSUserLogss的時候會顯示如下內容但不影響執行:

[[email protected] spark-1.3.1-bin-hadoop2.6]$ bin/spark-submit --master spark://h71:7077 --name JavaWordCountByHQ --class org.apache.spark.examples.streaming.OnlineBBSUserLogss --executor-memory 500m --total-executor-cores 2 streaming.jar

17/06/21 22:49:46 WARN spark.SparkConf:  SPARK_CLASSPATH was detected (set to '/home/hadoop/spark-1.3.1-bin-hadoop2.6/lib/*'). This is deprecated in Spark 1.0+.   Please instead use:  - ./spark-submit with --driver-class-path to augment the driver classpath  - spark.executor.extraClassPath to augment the executor classpath

第二種方法:(推薦使用這種) 上面不是都已經提示了嘛,Please instead use: - ./spark-submit with --driver-class-path to augment the driver classpath

所以執行如下命令:

[[email protected] spark-1.3.1-bin-hadoop2.6]$ bin/spark-submit --master spark://h71:7077 --name JavaWordCountByHQ --class org.apache.spark.examples.streaming.SparkStreamingDataManuallyProducerForKafkas --executor-memory 500m --total-executor-cores 2 streaming.jar --driver-class-path /home/hadoop/spark-1.3.1-bin-hadoop2.6/lib/spark-examples-1.3.1-hadoop2.6.0.jar

執行該命令後會產生資料寫入到kafka中,再執行

[[email protected] spark-1.3.1-bin-hadoop2.6]$ bin/spark-submit --master spark://h71:7077 --name JavaWordCountByHQ --class org.apache.spark.examples.streaming.OnlineBBSUserLogss --executor-memory 500m --total-executor-cores 2 streaming.jar --driver-class-path /home/hadoop/spark-1.3.1-bin-hadoop2.6/lib/spark-examples-1.3.1-hadoop2.6.0.jar

注意:在spark-1.6.0-bin-hadoop2.6版本中--driver-class-path的位置還不能放在最後,否則無法識別,執行命令為

[[email protected] spark-1.6.0-bin-hadoop2.6]$ bin/spark-submit --master spark://h71:7077 --name JavaWordCountByHQ --driver-class-path /home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/spark-examples-1.6.0-hadoop2.6.0.jar --class org.apache.spark.examples.streaming.OnlineBBSUserLogss --executor-memory 500m --total-executor-cores 2 streaming.jar

OnlineBBSUserLogs成功消費資料,並統計出數值,實驗成功

.......   16/05/08 19:00:33 INFO scheduler.DAGScheduler: Job 2 finished: print at OnlineBBSUserLogs.java:113, took 0.385315 s ------------------------------------------- Time: 1462705200000 ms ------------------------------------------- (Flink,89) (Storm,99) (Scala,97) (HBase,107) (Spark,91) (Hadoop,108) (Hive,129) (Impala,82) (Kafka,101) (ML,97) ...

知識點: 1、建立kafka的createDirectStream,返回JavaPairInputDStream型別的line值 org.apache.spark.streaming.kafka.createDirectStream 原始碼 package org.apache.spark.streaming.kafka   /**    * Create an input stream that directly pulls messages from Kafka Brokers    * without using any receiver. This stream can guarantee that each message    * from Kafka is included in transformations exactly once (see points below).    *    * Points to note:    *  - No receivers: This stream does not use any receiver. It directly queries Kafka    *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked    *    by the stream itself. For interoperability with Kafka monitoring tools that depend on    *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.    *    You can access the offsets used in each batch from the generated RDDs (see    *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).    *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing    *    in the [[StreamingContext]]. The information on consumed offset can be    *    recovered from the checkpoint. See the programming guide for details (constraints, etc.).    *  - End-to-end semantics: This stream ensures that every records is effectively received and    *    transformed exactly once, but gives no guarantees on whether the transformed data are    *    outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure    *    that the output operation is idempotent, or use transactions to output records atomically.    *    See the programming guide for more details.    *    * @param jssc JavaStreamingContext object    * @param keyClass Class of the keys in the Kafka records    * @param valueClass Class of the values in the Kafka records    * @param keyDecoderClass Class of the key decoder    * @param valueDecoderClass Class type of the value decoder    * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">    *   configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"    *   to be set with Kafka broker(s) (NOT zookeeper servers), specified in    *   host1:port1,host2:port2 form.    *   If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"    *   to determine where the stream starts (defaults to "largest")    * @param topics Names of the topics to consume    * @tparam K type of Kafka message key    * @tparam V type of Kafka message value    * @tparam KD type of Kafka message key decoder    * @tparam VD type of Kafka message value decoder    * @return DStream of (Kafka message key, Kafka message value)    */   def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](       jssc: JavaStreamingContext,       keyClass: Class[K],       valueClass: Class[V],       keyDecoderClass: Class[KD],       valueDecoderClass: Class[VD],       kafkaParams: JMap[String, String],       topics: JSet[String]     ): JavaPairInputDStream[K, V] = {     implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)     implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)     implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)     implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)     createDirectStream[K, V, KD, VD](       jssc.ssc,       Map(kafkaParams.asScala.toSeq: _*),       Set(topics.asScala.toSeq: _*)     )   } }

2、讀取kafka的資料流的值以後,進行相關mapToPair、reduceByKey的操作 mapToPair-reduceByKey-PairFunction-Function2的原始碼 package org.apache.spark.api.java.function.PairFunction  /**  * A function that returns key-value pairs (Tuple2<K, V>), and can be used to  * construct PairRDDs.  */ public interface PairFunction<T, K, V> extends Serializable {   public Tuple2<K, V> call(T t) throws Exception; }       package org.apache.spark.api.java.function.Function2 /**  * A two-argument function that takes arguments of type T1 and T2 and returns an R.  */ public interface Function2<T1, T2, R> extends Serializable {   public R call(T1 v1, T2 v2) throws Exception; }       package org.apache.spark.streaming.api.java.reduceByKey /**    * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are    * merged using the associative reduce function. Hash partitioning is used to generate the RDDs    * with Spark's default number of partitions.    */   def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] =     dstream.reduceByKey(func)     package org.apache.spark.streaming.api.java.mapToPair    /** Return a new DStream by applying a function to all elements of this DStream. */   def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {     def cm: ClassTag[(K2, V2)] = fakeClassTag     new JavaPairDStream(dstream.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])   }

-----------------------------------------------------------------------------------------------------------------------------------------------------

最近有個需求,實時統計pv,uv,結果按照date,hour,pv,uv來展示,按天統計,第二天重新統計,當然了實際還需要按照型別欄位分類統計pv,uv,比如按照date,hour,pv,uv,type來展示。這裡介紹最基本的pv,uv的展示。

id    uv    pv    date    hour 1    155599    306053    2018-07-27    18 關於什麼是pv,uv,可以參見這篇部落格https://blog.csdn.net/petermsh/article/details/78652246

1、專案流程

日誌資料從flume採集過來,落到hdfs供其它離線業務使用,也會sink到kafka,sparkStreaming從kafka拉資料過來,計算pv,uv,uv是用的redis的set集合去重,最後把結果寫入mysql資料庫,供前端展示使用。

2、具體過程 1)pv的計算 拉取資料有兩種方式,基於received和direct方式,這裡用direct直拉的方式,用的mapWithState運算元儲存狀態,這個運算元與updateStateByKey一樣,並且效能更好。當然了實際中資料過來需要經過清洗,過濾,才能使用。

定義一個狀態函式

// 實時流量狀態更新函式   val mapFunction = (datehour:String, pv:Option[Long], state:State[Long]) => {     val accuSum = pv.getOrElse(0L) + state.getOption().getOrElse(0L)     val output = (datehour,accuSum)     state.update(accuSum)     output   } 1 2 3 4 5 6 7  計算pv  val stateSpec = StateSpec.function(mapFunction)  val helper_count_all = helper_data.map(x => (x._1,1L)).mapWithState(stateSpec).stateSnapshots().repartition(2) 1 2 3 這樣就很容易的把pv計算出來了。

2)uv的計算 uv是要全天去重的,每次進來一個batch的資料,如果用原生的reduceByKey或者groupByKey對配置要求太高,在配置較低情況下,我們申請了一個93G的redis用來去重,原理是每進來一條資料,將date作為key,guid加入set集合,20秒重新整理一次,也就是將set集合的尺寸取出來,更新一下資料庫即可。

helper_data.foreachRDD(rdd => {         rdd.foreachPartition(eachPartition => {         // 獲取redis連線           val jedis = getJedis           eachPartition.foreach(x => {             val date:String = x._1.split(":")(0)             val key = date             // 將date作為key,guid(x._2)加入set集合             jedis.sadd(key,x._2)             // 設定儲存每天的資料的set過期時間,防止超過redis容量,這樣每天的set集合,定期會被自動刪除             jedis.expire(key,ConfigFactory.rediskeyexists)           })           // 關閉連線           closeJedis(jedis)         })       }) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 3)結果儲存到資料庫 結果儲存到mysql,資料庫,20秒重新整理一次資料庫,前端展示重新整理一次,就會重新查詢一次資料庫,做到實時統計展示pv,uv的目的。

/**     * 插入資料     * @param data (addTab(datehour)+helperversion)     * @param tbName     * @param colNames     */   def insertHelper(data: DStream[(String, Long)], tbName: String, colNames: String*): Unit = {     data.foreachRDD(rdd => {       val tmp_rdd = rdd.map(x => x._1.substring(11, 13).toInt)       if (!rdd.isEmpty()) {         val hour_now = tmp_rdd.max() // 獲取當前結果中最大的時間,在資料恢復中可以起作用         rdd.foreachPartition(eachPartition => {           try {             val jedis = getJedis             val conn = MysqlPoolUtil.getConnection()             conn.setAutoCommit(false)             val stmt = conn.createStatement()             eachPartition.foreach(x => {               val datehour = x._1.split("\t")(0)               val helperversion = x._1.split("\t")(1)               val date_hour = datehour.split(":")               val date = date_hour(0)               val hour = date_hour(1).toInt

              val colName0 = colNames(0) // date               val colName1 = colNames(1) // hour               val colName2 = colNames(2) // count_all               val colName3 = colNames(3) // count               val colName4 = colNames(4) // helperversion               val colName5 = colNames(5) // datehour               val colName6 = colNames(6) // dh

              val colValue0 = addYin(date)               val colValue1 = hour               val colValue2 = x._2.toInt               val colValue3 = jedis.scard(date + "_" + helperversion) // // 2018-07-08_10.0.1.22               val colValue4 = addYin(helperversion)               var colValue5 = if (hour < 10) "'" + date + " 0" + hour + ":00 " + helperversion + "'" else "'" + date + " " + hour + ":00 " + helperversion + "'"               val colValue6 = if(hour < 10) "'" + date + " 0" + hour + ":00'" else "'" + date + " " + hour + ":00'"

              var sql = ""               if (hour == hour_now) { // uv只對現在更新                 sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName3},${colName4},${colName5}) values(${colValue0},${colValue1},${colValue2},${colValue3},${colValue4},${colValue5}) on duplicate key update ${colName2} =  ${colValue2},${colName3} = ${colValue3}"               } else {                 sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName4},${colName5}) values(${colValue0},${colValue1},${colValue2},${colValue4},${colValue5}) on duplicate key update ${colName2} =  ${colValue2}"               }               stmt.addBatch(sql)             })             closeJedis(jedis)             stmt.executeBatch() // 批量執行sql語句             conn.commit()             conn.close()           } catch {             case e: Exception => {               logger.error(e)               logger2.error(HelperHandle.getClass.getSimpleName + e)             }           }         })       }     })   }

// 計算當前時間距離次日零點的時長(毫秒) def resetTime = {     val now = new Date()     val todayEnd = Calendar.getInstance     todayEnd.set(Calendar.HOUR_OF_DAY, 23) // Calendar.HOUR 12小時制     todayEnd.set(Calendar.MINUTE, 59)     todayEnd.set(Calendar.SECOND, 59)     todayEnd.set(Calendar.MILLISECOND, 999)     todayEnd.getTimeInMillis - now.getTime  } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 4)資料容錯 流處理消費kafka都會考慮到資料丟失問題,一般可以儲存到任何儲存系統,包括mysql,hdfs,hbase,redis,zookeeper等到。這裡用SparkStreaming自帶的checkpoint機制來實現應用重啟時資料恢復。

checkpoint 這裡採用的是checkpoint機制,在重啟或者失敗後重啟可以直接讀取上次沒有完成的任務,從kafka對應offset讀取資料。

// 初始化配置檔案 ConfigFactory.initConfig()

val conf = new SparkConf().setAppName(ConfigFactory.sparkstreamname) conf.set("spark.streaming.stopGracefullyOnShutdown","true") conf.set("spark.streaming.kafka.maxRatePerPartition",consumeRate) conf.set("spark.default.parallelism","24") val sc = new SparkContext(conf)

while (true){     val ssc = StreamingContext.getOrCreate(ConfigFactory.checkpointdir + DateUtil.getDay(0),getStreamingContext _ )     ssc.start()     ssc.awaitTerminationOrTimeout(resetTime)     ssc.stop(false,true) } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 checkpoint是每天一個目錄,在第二天凌晨定時銷燬StreamingContext物件,重新統計計算pv,uv。

注意 ssc.stop(false,true)表示優雅地銷燬StreamingContext物件,不能銷燬SparkContext物件,ssc.stop(true,true)會停掉SparkContext物件,程式就直接停了。

應用遷移或者程序升級 在這個過程中,我們把應用升級了一下,比如說某個功能寫的不夠完善,或者有邏輯錯誤,這時候都是需要修改程式碼,重新打jar包的,這時候如果把程式停了,新的應用還是會讀取老的checkpoint,可能會有兩個問題:

執行的還是上一次的程式,因為checkpoint裡面也有序列化的程式碼; 直接執行失敗,反序列化失敗; 其實有時候,修改程式碼後不用刪除checkpoint也是可以直接生效,經過很多測試,我發現如果對資料的過濾操作導致資料過濾邏輯改變,還有狀態操作儲存修改,也會導致重啟失敗,只有刪除checkpoint才行,可是實際中一旦刪除checkpoint,就會導致上一次未完成的任務和消費kafka的offset丟失,直接導致資料丟失,這種情況下我一般這麼做。

這種情況一般是在另外一個叢集,或者把checkpoint目錄修改下,我們是程式碼與配置檔案分離,所以修改配置檔案checkpoint的位置還是很方便的。然後兩個程式一起跑,除了checkpoint目錄不一樣,會重新建,都插入同一個資料庫,跑一段時間後,把舊的程式停掉就好。以前看官網這麼說,只能記住不能清楚明瞭,只有自己做時才會想一下辦法去保證資料準確。

5)日誌 日誌用的log4j2,本地儲存一份,ERROR級別的日誌會通過郵件傳送到手機。

val logger = LogManager.getLogger(HelperHandle.getClass.getSimpleName)   // 郵件level=error日誌   val logger2 = LogManager.getLogger("email")