Spark Streaming實戰對論壇網站動態行為pv,uv,註冊人數,跳出率的多維度分析,實時統計每天pv,uv的sparkStreaming結合redis結果存入mysql供前端展示
論壇資料執行程式碼自動生成,該生成的資料會作為Producer的方式傳送給Kafka,然後SparkStreaming程式會從Kafka中線上Pull到論壇或者網站的使用者線上行為資訊,進而進行多維度的線上分析 資料格式如下: date:日期,格式為yyyy-MM-dd timestamp:時間戳 userID:使用者ID pageID:頁面ID chanelID:板塊的ID action:點選和註冊
生成的使用者點選模擬資料如下:
product:2017-06-20 1497948113817 1397 91 ML View product:2017-06-20 1497948113819 149 1941 ML Register product:2017-06-20 1497948113820 null 335 Spark Register product:2017-06-20 1497948113821 1724 1038 ML View product:2017-06-20 1497948113822 282 494 Flink View product:2017-06-20 1497948113823 null 1619 ML View product:2017-06-20 1497948113823 991 1950 ML View product:2017-06-20 1497948113824 686 1347 Kafka Register product:2017-06-20 1497948113825 1982 1145 Hive View product:2017-06-20 1497948113826 211 1097 Storm View product:2017-06-20 1497948113827 633 1345 Hive View product:2017-06-20 1497948113828 957 1381 Hadoop Register product:2017-06-20 1497948113831 300 1781 Spark View product:2017-06-20 1497948113832 1244 1076 Hadoop Register product:2017-06-20 1497948113833 1958 634 ML View
生成模擬資料程式碼:
package org.apache.spark.examples.streaming; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties; import java.util.Random; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * 這裡產生資料,就會發送給kafka,kafka那邊啟動消費者,就會接收到資料,這一步是用來測試生成資料和消費資料沒有問題的,確定沒問題後要關閉消費者, * 啟動OnlineBBSUserLogss.java的類作為消費者,就會按pv,uv等方式處理這些資料。 * 因為一個topic只能有一個消費者,所以啟動程式前必須關閉kafka方式啟動的消費者(我這裡沒有關閉關閉kafka方式啟動的消費者也沒正常啊) */ public class SparkStreamingDataManuallyProducerForKafkas extends Thread{ //具體的論壇頻道 static String[] channelNames = new String[]{ "Spark","Scala","Kafka","Flink","Hadoop","Storm", "Hive","Impala","HBase","ML" }; //使用者的兩種行為模式 static String[] actionNames = new String[]{"View", "Register"}; private static Producer<String, String> producerForKafka; private static String dateToday; private static Random random; //2、作為執行緒而言,要複寫run方法,先寫業務邏輯,再寫控制 @Override public void run() { int counter = 0;//搞500條 while(true){//模擬實際情況,不斷迴圈,非同步過程,不可能是同步過程 counter++; String userLog = userlogs(); System.out.println("product:"+userLog); //"test"為topic producerForKafka.send(new KeyedMessage<String, String>("test", userLog)); if(0 == counter%500){ counter = 0; try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } private static String userlogs() { StringBuffer userLogBuffer = new StringBuffer(""); int[] unregisteredUsers = new int[]{1, 2, 3, 4, 5, 6, 7, 8}; long timestamp = new Date().getTime(); Long userID = 0L; long pageID = 0L; //隨機生成的使用者ID if(unregisteredUsers[random.nextInt(8)] == 1) { userID = null; } else { userID = (long) random.nextInt((int) 2000); } //隨機生成的頁面ID pageID = random.nextInt((int) 2000); //隨機生成Channel String channel = channelNames[random.nextInt(10)]; //隨機生成action行為 String action = actionNames[random.nextInt(2)]; userLogBuffer.append(dateToday) .append("\t") .append(timestamp) .append("\t") .append(userID) .append("\t") .append(pageID) .append("\t") .append(channel) .append("\t") .append(action); //這裡不要加\n換行符,因為kafka自己會換行,再append一個換行符,消費者那邊就會處理不出資料 return userLogBuffer.toString(); } public static void main(String[] args) throws Exception { dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); random = new Random(); Properties props = new Properties(); props.put("zk.connect", "h71:2181,h72:2181,h73:2181"); props.put("metadata.broker.list","h71:9092,h72:9092,h73:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); ProducerConfig config = new ProducerConfig(props); producerForKafka = new Producer<String, String>(config); new SparkStreamingDataManuallyProducerForKafkas().start(); } }
pv,uv,註冊人數,跳出率的多維度分析程式碼:
package org.apache.spark.examples.streaming; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import kafka.serializer.StringDecoder; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; /* *消費者消費SparkStreamingDataManuallyProducerForKafka類中邏輯級別產生的資料,這裡是計算pv,uv,註冊人數,跳出率的方式 */ public class OnlineBBSUserLogss { public static void main(String[] args) { /** * 第一步:配置SparkConf: * 1,至少2條執行緒:因為Spark Streaming應用程式在執行的時候,至少有一條 * 執行緒用於不斷的迴圈接收資料,並且至少有一條執行緒用於處理接受的資料(否則的話無法 * 有執行緒用於處理資料,隨著時間的推移,記憶體和磁碟都會不堪重負); * 2,對於叢集而言,每個Executor一般肯定不止一個Thread,那對於處理Spark Streaming的 * 應用程式而言,每個Executor一般分配多少Core比較合適?根據我們過去的經驗,5個左右的 * Core是最佳的(一個段子分配為奇數個Core表現最佳,例如3個、5個、7個Core等); */ // SparkConf conf = new SparkConf().setMaster("spark://h71:7077").setAppName("OnlineBBSUserLogs"); SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local[2]"); /** * 第二步:建立SparkStreamingContext: * 1,這個是SparkStreaming應用程式所有功能的起始點和程式排程的核心 * SparkStreamingContext的構建可以基於SparkConf引數,也可基於持久化的SparkStreamingContext的內容 * 來恢復過來(典型的場景是Driver崩潰後重新啟動,由於Spark Streaming具有連續7*24小時不間斷執行的特徵, * 所有需要在Driver重新啟動後繼續上次的狀態,此時的狀態恢復需要基於曾經的Checkpoint); * 2,在一個Spark Streaming應用程式中可以建立若干個SparkStreamingContext物件,使用下一個SparkStreamingContext * 之前需要把前面正在執行的SparkStreamingContext物件關閉掉,由此,我們獲得一個重大的啟發SparkStreaming框架也只是 * Spark Core上的一個應用程式而已,只不過Spark Streaming框架箱執行的話需要Spark工程師寫業務邏輯處理程式碼; */ JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5)); /** * 第三步:建立Spark Streaming輸入資料來源input Stream: * 1,資料輸入來源可以基於File、HDFS、Flume、Kafka、Socket等 * 2, 在這裡我們指定資料來源於網路Socket埠,Spark Streaming連線上該埠並在執行的時候一直監聽該埠 * 的資料(當然該埠服務首先必須存在),並且在後續會根據業務需要不斷的有資料產生(當然對於Spark Streaming * 應用程式的執行而言,有無資料其處理流程都是一樣的); * 3,如果經常在每間隔5秒鐘沒有資料的話不斷的啟動空的Job其實是會造成排程資源的浪費,因為並沒有資料需要發生計算,所以 * 例項的企業級生成環境的程式碼在具體提交Job前會判斷是否有資料,如果沒有的話就不再提交Job; */ Map<String, String> kafkaParameters = new HashMap<String, String>(); kafkaParameters.put("metadata.broker.list","h71:9092,h72:9092,h73:9092"); Set topics = new HashSet<String>(); topics.add("test"); JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream( jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParameters, topics); //線上PV計算 onlinePagePV(lines); //線上UV計算 // onlineUV(lines); //線上計算註冊人數 // onlineRegistered(lines); //線上計算跳出率 // onlineJumped(lines); //線上不同模組的PV // onlineChannelPV(lines); /* * Spark Streaming執行引擎也就是Driver開始執行,Driver啟動的時候是位於一條新的執行緒中的,當然其內部有訊息迴圈體,用於 * 接受應用程式本身或者Executor中的訊息; */ jsc.start(); jsc.awaitTermination(); jsc.close(); } private static void onlineChannelPV(JavaPairInputDStream<String, String> lines) { lines.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() { @Override public Tuple2<String, Long> call(Tuple2<String,String> t) throws Exception { String[] logs = t._2.split("\t"); String channelID =logs[4]; return new Tuple2<String,Long>(channelID, 1L); } }).reduceByKey(new Function2<Long, Long, Long>() { //對相同的Key,進行Value的累計(包括Local和Reducer級別同時Reduce) @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } }).print(); } private static void onlineJumped(JavaPairInputDStream<String, String> lines) { lines.filter(new Function<Tuple2<String,String>, Boolean>() { @Override public Boolean call(Tuple2<String, String> v1) throws Exception { String[] logs = v1._2.split("\t"); String action = logs[5]; if("View".equals(action)){ return true; } else { return false; } } }).mapToPair(new PairFunction<Tuple2<String,String>, Long, Long>() { @Override public Tuple2<Long, Long> call(Tuple2<String,String> t) throws Exception { String[] logs = t._2.split("\t"); // Long usrID = Long.valueOf(logs[2] != null ? logs[2] : "-1"); 這個有錯 Long usrID = Long.valueOf("null".equals(logs[2]) ? "-1" : logs[2]); return new Tuple2<Long,Long>(usrID, 1L); } }).reduceByKey(new Function2<Long, Long, Long>() { //對相同的Key,進行Value的累計(包括Local和Reducer級別同時Reduce) @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } }).filter(new Function<Tuple2<Long,Long>, Boolean>() { @Override public Boolean call(Tuple2<Long, Long> v1) throws Exception { if(1 == v1._2){ return true; } else { return false; } } }).count().print(); } private static void onlineRegistered(JavaPairInputDStream<String, String> lines) { lines.filter(new Function<Tuple2<String,String>, Boolean>() { @Override public Boolean call(Tuple2<String, String> v1) throws Exception { String[] logs = v1._2.split("\t"); String action = logs[5]; if("Register".equals(action)){ return true; } else { return false; } } }).count().print(); } /** * 因為要計算UV,所以需要獲得同樣的Page的不同的User,這個時候就需要去重操作,DStreamzhong有distinct嗎?當然沒有(截止到Spark 1.6.1的時候還沒有該Api) * 此時我們就需要求助於DStream魔術般的方法tranform,在該方法內部直接對RDD進行distinct操作,這樣就是實現了使用者UserID的去重,進而就可以計算出UV了。 * @param lines */ private static void onlineUV(JavaPairInputDStream<String, String> lines) { /* * 第四步:接下來就像對於RDD程式設計一樣基於DStream進行程式設計!!!原因是DStream是RDD產生的模板(或者說類),在Spark Streaming具體 * 發生計算前,其實質是把每個Batch的DStream的操作翻譯成為對RDD的操作!!! * 對初始的DStream進行Transformation級別的處理,例如map、filter等高階函式等的程式設計,來進行具體的資料計算 */ JavaPairDStream<String, String> logsDStream = lines.filter(new Function<Tuple2<String,String>, Boolean>() { @Override public Boolean call(Tuple2<String, String> v1) throws Exception { String[] logs = v1._2.split("\t"); String action = logs[5]; if("View".equals(action)){ return true; } else { return false; } } }); //在單詞拆分的基礎上對每個單詞例項計數為1,也就是word => (word, 1) logsDStream.map(new Function<Tuple2<String,String>,String>(){ @Override public String call(Tuple2<String, String> v1) throws Exception { String[] logs =v1._2.split("\t"); String usrID = String.valueOf(logs[2] != null ? logs[2] : "-1" ); //原文是Long usrID = Long.valueOf(logs[2] != null ? logs[2] : "-1" ); //報錯:java.lang.NumberFormatException: For input string: "null" Long pageID = Long.valueOf(logs[3]); return pageID+"_"+usrID; } }).transform(new Function<JavaRDD<String>,JavaRDD<String>>(){ @Override public JavaRDD<String> call(JavaRDD<String> v1) throws Exception { // TODO Auto-generated method stub return v1.distinct(); } }).mapToPair(new PairFunction<String, Long, Long>() { @Override public Tuple2<Long, Long> call(String t) throws Exception { String[] logs = t.split("_"); Long pageId = Long.valueOf(logs[0]); return new Tuple2<Long,Long>(pageId, 1L); } }).reduceByKey(new Function2<Long, Long, Long>() { //對相同的Key,進行Value的累計(包括Local和Reducer級別同時Reduce) @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } }).print(); } private static void onlinePagePV(JavaPairInputDStream<String, String> lines) { /* * 第四步:接下來就像對於RDD程式設計一樣基於DStream進行程式設計!!!原因是DStream是RDD產生的模板(或者說類),在Spark Streaming具體 * 發生計算前,其實質是把每個Batch的DStream的操作翻譯成為對RDD的操作!!! * 對初始的DStream進行Transformation級別的處理,例如map、filter等高階函式等的程式設計,來進行具體的資料計算 */ JavaPairDStream<String, String> logsDStream = lines.filter(new Function<Tuple2<String,String>, Boolean>() { @Override public Boolean call(Tuple2<String, String> v1) throws Exception { String[] logs = v1._2.split("\t"); String action = logs[5]; if("View".equals(action)){ return true; } else { return false; } } }); //在單詞拆分的基礎上對每個單詞例項計數為1,也就是word => (word, 1) JavaPairDStream<Long, Long> pairs = logsDStream.mapToPair(new PairFunction<Tuple2<String,String>, Long, Long>() { @Override public Tuple2<Long, Long> call(Tuple2<String, String> t) throws Exception { String[] logs = t._2.split("\t"); Long pageId = Long.valueOf(logs[3]); return new Tuple2<Long,Long>(pageId, 1L); } }); //在單詞例項計數為1基礎上,統計每個單詞在檔案中出現的總次數 JavaPairDStream<Long, Long> wordsCount = pairs.reduceByKey(new Function2<Long, Long, Long>() { //對相同的Key,進行Value的累計(包括Local和Reducer級別同時Reduce) //對相同的key,進行Value的累加(包括Local和Reducer級別同時Reduce) @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } }); /* * 此處的print並不會直接出發Job的執行,因為現在的一切都是在Spark Streaming框架的控制之下的,對於Spark Streaming * 而言具體是否觸發真正的Job執行是基於設定的Duration時間間隔的 * * 諸位一定要注意的是Spark Streaming應用程式要想執行具體的Job,對Dtream就必須有output Stream操作, * output Stream有很多型別的函式觸發,類print、saveAsTextFile、saveAsHadoopFiles等,最為重要的一個 * 方法是foraeachRDD,因為Spark Streaming處理的結果一般都會放在Redis、DB、DashBoard等上面,foreachRDD * 主要就是用用來完成這些功能的,而且可以隨意的自定義具體資料到底放在哪裡!!! * * 在企業生產環境下,一般會把計算的資料放入Redis或者DB中,採用J2EE等技術進行趨勢的繪製等,這就像動態更新的股票交易一下來實現 * 線上的監控等; */ wordsCount.print(); } }
啟動hadoop、spark、zookeeper、kafka叢集(啟動過程就不多言了)這裡把我使用的版本列出: hadoop hadoop-2.6.0-cdh5.5.2 kafka kafka_2.10-0.8.2.0 spark spark-1.3.1-bin-hadoop2.6(後來我又裝了spark-1.6.0-bin-hadoop2.6也行) zookeeper zookeeper-3.4.5-cdh5.5.2
java jdk1.7.0_25
在myeclipse中建立專案:
(這裡我吐槽一下,在myeclipse-8.5和myeclipse-10.7.1版本中只能識別spark-1.3.1-bin-hadoop2.6的jar包卻無法識別spark-1.6.0-bin-hadoop2.6的jar包,雖然用spark-1.3.1-bin-hadoop2.6的jar包也能正常執行不影響什麼,但有強迫症的我咋能忍,無奈我下載了個myeclipse-pro-2014-GA版本(你下載最新的版本應該也可以吧)才這兩個版本spark的jar包都識別,我尼瑪也是醉了。。。)
將該專案打成streaming.jar包上從本地上傳到虛擬機器上,我這裡是上傳到了/home/hadoop/spark-1.3.1-bin-hadoop2.6目錄中
第一步:kafka建立topic
[[email protected] kafka_2.10-0.8.2.0]$ bin/kafka-topics.sh --create --zookeeper h71:2181 --replication-factor 2 --partitions 2 --topic test
(如果不建立該topic的話,也倒無妨,因為你如果先直接執行SparkStreamingDataManuallyProducerForKafkas.java的時候會自動建立topic,如果是先執行的OnlineBBSUserLogss.java的話雖然第一次會報錯:Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.SparkException: Couldn't find leader offsets for Set(),但是它已經為你建立了該topic,再執行的話則不會報錯了,只不過他們建立的該topic都預設分割槽和副本都為1)
第二步:執行SparkStreamingDataManuallyProducerForKafka
[[email protected] spark-1.3.1-bin-hadoop2.6]$ bin/spark-submit --master spark://h71:7077 --name JavaWordCountByHQ --class org.apache.spark.examples.streaming.SparkStreamingDataManuallyProducerForKafkas --executor-memory 500m --total-executor-cores 2 streaming.jar
會報錯:
Exception in thread "main" java.lang.NoClassDefFoundError: kafka/producer/ProducerConfig at com.spark.study.streaming.SparkStreamingDataManuallyProducerForKafkas.main(SparkStreamingDataManuallyProducerForKafkas.java:102) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.producer.ProducerConfig at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 10 more
解決: 第一種方法:
在spark-env.sh中新增如下內容:
[[email protected] spark-1.3.1-bin-hadoop2.6]$ vi conf/spark-env.sh export SPARK_HOME=/home/hadoop/spark-1.3.1-bin-hadoop2.6 export SPARK_CLASSPATH=$SPARK_HOME/lib/*
再執行SparkStreamingDataManuallyProducerForKafka
[[email protected] spark-1.3.1-bin-hadoop2.6]$ bin/spark-submit --master spark://h71:7077 --name JavaWordCountByHQ --class org.apache.spark.examples.streaming.SparkStreamingDataManuallyProducerForKafkas --executor-memory 500m --total-executor-cores 2 streaming.jar
但是這種方法不是很好,因為再執行OnlineBBSUserLogss的時候會顯示如下內容但不影響執行:
[[email protected] spark-1.3.1-bin-hadoop2.6]$ bin/spark-submit --master spark://h71:7077 --name JavaWordCountByHQ --class org.apache.spark.examples.streaming.OnlineBBSUserLogss --executor-memory 500m --total-executor-cores 2 streaming.jar
17/06/21 22:49:46 WARN spark.SparkConf: SPARK_CLASSPATH was detected (set to '/home/hadoop/spark-1.3.1-bin-hadoop2.6/lib/*'). This is deprecated in Spark 1.0+. Please instead use: - ./spark-submit with --driver-class-path to augment the driver classpath - spark.executor.extraClassPath to augment the executor classpath
第二種方法:(推薦使用這種) 上面不是都已經提示了嘛,Please instead use: - ./spark-submit with --driver-class-path to augment the driver classpath
所以執行如下命令:
[[email protected] spark-1.3.1-bin-hadoop2.6]$ bin/spark-submit --master spark://h71:7077 --name JavaWordCountByHQ --class org.apache.spark.examples.streaming.SparkStreamingDataManuallyProducerForKafkas --executor-memory 500m --total-executor-cores 2 streaming.jar --driver-class-path /home/hadoop/spark-1.3.1-bin-hadoop2.6/lib/spark-examples-1.3.1-hadoop2.6.0.jar
執行該命令後會產生資料寫入到kafka中,再執行
[[email protected] spark-1.3.1-bin-hadoop2.6]$ bin/spark-submit --master spark://h71:7077 --name JavaWordCountByHQ --class org.apache.spark.examples.streaming.OnlineBBSUserLogss --executor-memory 500m --total-executor-cores 2 streaming.jar --driver-class-path /home/hadoop/spark-1.3.1-bin-hadoop2.6/lib/spark-examples-1.3.1-hadoop2.6.0.jar
注意:在spark-1.6.0-bin-hadoop2.6版本中--driver-class-path的位置還不能放在最後,否則無法識別,執行命令為
[[email protected] spark-1.6.0-bin-hadoop2.6]$ bin/spark-submit --master spark://h71:7077 --name JavaWordCountByHQ --driver-class-path /home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/spark-examples-1.6.0-hadoop2.6.0.jar --class org.apache.spark.examples.streaming.OnlineBBSUserLogss --executor-memory 500m --total-executor-cores 2 streaming.jar
OnlineBBSUserLogs成功消費資料,並統計出數值,實驗成功
....... 16/05/08 19:00:33 INFO scheduler.DAGScheduler: Job 2 finished: print at OnlineBBSUserLogs.java:113, took 0.385315 s ------------------------------------------- Time: 1462705200000 ms ------------------------------------------- (Flink,89) (Storm,99) (Scala,97) (HBase,107) (Spark,91) (Hadoop,108) (Hive,129) (Impala,82) (Kafka,101) (ML,97) ...
知識點: 1、建立kafka的createDirectStream,返回JavaPairInputDStream型別的line值 org.apache.spark.streaming.kafka.createDirectStream 原始碼 package org.apache.spark.streaming.kafka /** * Create an input stream that directly pulls messages from Kafka Brokers * without using any receiver. This stream can guarantee that each message * from Kafka is included in transformations exactly once (see points below). * * Points to note: * - No receivers: This stream does not use any receiver. It directly queries Kafka * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked * by the stream itself. For interoperability with Kafka monitoring tools that depend on * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. * You can access the offsets used in each batch from the generated RDDs (see * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). * - Failure Recovery: To recover from driver failures, you have to enable checkpointing * in the [[StreamingContext]]. The information on consumed offset can be * recovered from the checkpoint. See the programming guide for details (constraints, etc.). * - End-to-end semantics: This stream ensures that every records is effectively received and * transformed exactly once, but gives no guarantees on whether the transformed data are * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure * that the output operation is idempotent, or use transactions to output records atomically. * See the programming guide for more details. * * @param jssc JavaStreamingContext object * @param keyClass Class of the keys in the Kafka records * @param valueClass Class of the values in the Kafka records * @param keyDecoderClass Class of the key decoder * @param valueDecoderClass Class type of the value decoder * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" * to be set with Kafka broker(s) (NOT zookeeper servers), specified in * host1:port1,host2:port2 form. * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest" * to determine where the stream starts (defaults to "largest") * @param topics Names of the topics to consume * @tparam K type of Kafka message key * @tparam V type of Kafka message value * @tparam KD type of Kafka message key decoder * @tparam VD type of Kafka message value decoder * @return DStream of (Kafka message key, Kafka message value) */ def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]]( jssc: JavaStreamingContext, keyClass: Class[K], valueClass: Class[V], keyDecoderClass: Class[KD], valueDecoderClass: Class[VD], kafkaParams: JMap[String, String], topics: JSet[String] ): JavaPairInputDStream[K, V] = { implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) createDirectStream[K, V, KD, VD]( jssc.ssc, Map(kafkaParams.asScala.toSeq: _*), Set(topics.asScala.toSeq: _*) ) } }
2、讀取kafka的資料流的值以後,進行相關mapToPair、reduceByKey的操作 mapToPair-reduceByKey-PairFunction-Function2的原始碼 package org.apache.spark.api.java.function.PairFunction /** * A function that returns key-value pairs (Tuple2<K, V>), and can be used to * construct PairRDDs. */ public interface PairFunction<T, K, V> extends Serializable { public Tuple2<K, V> call(T t) throws Exception; } package org.apache.spark.api.java.function.Function2 /** * A two-argument function that takes arguments of type T1 and T2 and returns an R. */ public interface Function2<T1, T2, R> extends Serializable { public R call(T1 v1, T2 v2) throws Exception; } package org.apache.spark.streaming.api.java.reduceByKey /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are * merged using the associative reduce function. Hash partitioning is used to generate the RDDs * with Spark's default number of partitions. */ def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] = dstream.reduceByKey(func) package org.apache.spark.streaming.api.java.mapToPair /** Return a new DStream by applying a function to all elements of this DStream. */ def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { def cm: ClassTag[(K2, V2)] = fakeClassTag new JavaPairDStream(dstream.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2]) }
-----------------------------------------------------------------------------------------------------------------------------------------------------
最近有個需求,實時統計pv,uv,結果按照date,hour,pv,uv來展示,按天統計,第二天重新統計,當然了實際還需要按照型別欄位分類統計pv,uv,比如按照date,hour,pv,uv,type來展示。這裡介紹最基本的pv,uv的展示。
id uv pv date hour 1 155599 306053 2018-07-27 18 關於什麼是pv,uv,可以參見這篇部落格https://blog.csdn.net/petermsh/article/details/78652246
1、專案流程
日誌資料從flume採集過來,落到hdfs供其它離線業務使用,也會sink到kafka,sparkStreaming從kafka拉資料過來,計算pv,uv,uv是用的redis的set集合去重,最後把結果寫入mysql資料庫,供前端展示使用。
2、具體過程 1)pv的計算 拉取資料有兩種方式,基於received和direct方式,這裡用direct直拉的方式,用的mapWithState運算元儲存狀態,這個運算元與updateStateByKey一樣,並且效能更好。當然了實際中資料過來需要經過清洗,過濾,才能使用。
定義一個狀態函式
// 實時流量狀態更新函式 val mapFunction = (datehour:String, pv:Option[Long], state:State[Long]) => { val accuSum = pv.getOrElse(0L) + state.getOption().getOrElse(0L) val output = (datehour,accuSum) state.update(accuSum) output } 1 2 3 4 5 6 7 計算pv val stateSpec = StateSpec.function(mapFunction) val helper_count_all = helper_data.map(x => (x._1,1L)).mapWithState(stateSpec).stateSnapshots().repartition(2) 1 2 3 這樣就很容易的把pv計算出來了。
2)uv的計算 uv是要全天去重的,每次進來一個batch的資料,如果用原生的reduceByKey或者groupByKey對配置要求太高,在配置較低情況下,我們申請了一個93G的redis用來去重,原理是每進來一條資料,將date作為key,guid加入set集合,20秒重新整理一次,也就是將set集合的尺寸取出來,更新一下資料庫即可。
helper_data.foreachRDD(rdd => { rdd.foreachPartition(eachPartition => { // 獲取redis連線 val jedis = getJedis eachPartition.foreach(x => { val date:String = x._1.split(":")(0) val key = date // 將date作為key,guid(x._2)加入set集合 jedis.sadd(key,x._2) // 設定儲存每天的資料的set過期時間,防止超過redis容量,這樣每天的set集合,定期會被自動刪除 jedis.expire(key,ConfigFactory.rediskeyexists) }) // 關閉連線 closeJedis(jedis) }) }) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 3)結果儲存到資料庫 結果儲存到mysql,資料庫,20秒重新整理一次資料庫,前端展示重新整理一次,就會重新查詢一次資料庫,做到實時統計展示pv,uv的目的。
/** * 插入資料 * @param data (addTab(datehour)+helperversion) * @param tbName * @param colNames */ def insertHelper(data: DStream[(String, Long)], tbName: String, colNames: String*): Unit = { data.foreachRDD(rdd => { val tmp_rdd = rdd.map(x => x._1.substring(11, 13).toInt) if (!rdd.isEmpty()) { val hour_now = tmp_rdd.max() // 獲取當前結果中最大的時間,在資料恢復中可以起作用 rdd.foreachPartition(eachPartition => { try { val jedis = getJedis val conn = MysqlPoolUtil.getConnection() conn.setAutoCommit(false) val stmt = conn.createStatement() eachPartition.foreach(x => { val datehour = x._1.split("\t")(0) val helperversion = x._1.split("\t")(1) val date_hour = datehour.split(":") val date = date_hour(0) val hour = date_hour(1).toInt
val colName0 = colNames(0) // date val colName1 = colNames(1) // hour val colName2 = colNames(2) // count_all val colName3 = colNames(3) // count val colName4 = colNames(4) // helperversion val colName5 = colNames(5) // datehour val colName6 = colNames(6) // dh
val colValue0 = addYin(date) val colValue1 = hour val colValue2 = x._2.toInt val colValue3 = jedis.scard(date + "_" + helperversion) // // 2018-07-08_10.0.1.22 val colValue4 = addYin(helperversion) var colValue5 = if (hour < 10) "'" + date + " 0" + hour + ":00 " + helperversion + "'" else "'" + date + " " + hour + ":00 " + helperversion + "'" val colValue6 = if(hour < 10) "'" + date + " 0" + hour + ":00'" else "'" + date + " " + hour + ":00'"
var sql = "" if (hour == hour_now) { // uv只對現在更新 sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName3},${colName4},${colName5}) values(${colValue0},${colValue1},${colValue2},${colValue3},${colValue4},${colValue5}) on duplicate key update ${colName2} = ${colValue2},${colName3} = ${colValue3}" } else { sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName4},${colName5}) values(${colValue0},${colValue1},${colValue2},${colValue4},${colValue5}) on duplicate key update ${colName2} = ${colValue2}" } stmt.addBatch(sql) }) closeJedis(jedis) stmt.executeBatch() // 批量執行sql語句 conn.commit() conn.close() } catch { case e: Exception => { logger.error(e) logger2.error(HelperHandle.getClass.getSimpleName + e) } } }) } }) }
// 計算當前時間距離次日零點的時長(毫秒) def resetTime = { val now = new Date() val todayEnd = Calendar.getInstance todayEnd.set(Calendar.HOUR_OF_DAY, 23) // Calendar.HOUR 12小時制 todayEnd.set(Calendar.MINUTE, 59) todayEnd.set(Calendar.SECOND, 59) todayEnd.set(Calendar.MILLISECOND, 999) todayEnd.getTimeInMillis - now.getTime } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 4)資料容錯 流處理消費kafka都會考慮到資料丟失問題,一般可以儲存到任何儲存系統,包括mysql,hdfs,hbase,redis,zookeeper等到。這裡用SparkStreaming自帶的checkpoint機制來實現應用重啟時資料恢復。
checkpoint 這裡採用的是checkpoint機制,在重啟或者失敗後重啟可以直接讀取上次沒有完成的任務,從kafka對應offset讀取資料。
// 初始化配置檔案 ConfigFactory.initConfig()
val conf = new SparkConf().setAppName(ConfigFactory.sparkstreamname) conf.set("spark.streaming.stopGracefullyOnShutdown","true") conf.set("spark.streaming.kafka.maxRatePerPartition",consumeRate) conf.set("spark.default.parallelism","24") val sc = new SparkContext(conf)
while (true){ val ssc = StreamingContext.getOrCreate(ConfigFactory.checkpointdir + DateUtil.getDay(0),getStreamingContext _ ) ssc.start() ssc.awaitTerminationOrTimeout(resetTime) ssc.stop(false,true) } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 checkpoint是每天一個目錄,在第二天凌晨定時銷燬StreamingContext物件,重新統計計算pv,uv。
注意 ssc.stop(false,true)表示優雅地銷燬StreamingContext物件,不能銷燬SparkContext物件,ssc.stop(true,true)會停掉SparkContext物件,程式就直接停了。
應用遷移或者程序升級 在這個過程中,我們把應用升級了一下,比如說某個功能寫的不夠完善,或者有邏輯錯誤,這時候都是需要修改程式碼,重新打jar包的,這時候如果把程式停了,新的應用還是會讀取老的checkpoint,可能會有兩個問題:
執行的還是上一次的程式,因為checkpoint裡面也有序列化的程式碼; 直接執行失敗,反序列化失敗; 其實有時候,修改程式碼後不用刪除checkpoint也是可以直接生效,經過很多測試,我發現如果對資料的過濾操作導致資料過濾邏輯改變,還有狀態操作儲存修改,也會導致重啟失敗,只有刪除checkpoint才行,可是實際中一旦刪除checkpoint,就會導致上一次未完成的任務和消費kafka的offset丟失,直接導致資料丟失,這種情況下我一般這麼做。
這種情況一般是在另外一個叢集,或者把checkpoint目錄修改下,我們是程式碼與配置檔案分離,所以修改配置檔案checkpoint的位置還是很方便的。然後兩個程式一起跑,除了checkpoint目錄不一樣,會重新建,都插入同一個資料庫,跑一段時間後,把舊的程式停掉就好。以前看官網這麼說,只能記住不能清楚明瞭,只有自己做時才會想一下辦法去保證資料準確。
5)日誌 日誌用的log4j2,本地儲存一份,ERROR級別的日誌會通過郵件傳送到手機。
val logger = LogManager.getLogger(HelperHandle.getClass.getSimpleName) // 郵件level=error日誌 val logger2 = LogManager.getLogger("email")