1. 程式人生 > >大資料專案實戰之新聞話題的實時統計分析

大資料專案實戰之新聞話題的實時統計分析

摘要: 本文講解一個完整的企業級大資料專案實戰,實時|離線統計分析使用者的搜尋話題,並用酷炫的前端介面展示出來。這些指標對網站的精準營銷、運營都有極大幫助。

前言:本文是一個完整的大資料專案實戰,實時|離線統計分析使用者的搜尋話題,並用酷炫的前端介面展示出來。這些指標對網站的精準營銷、運營都有極大幫助。架構大致是按照企業標準來的,從日誌的採集、轉化處理、實時計算、JAVA後臺開發、WEB前端展示,一條完整流程線下來,甚至每個節點都用的高可用架構,都考慮了故障轉移和容錯性。所用到的框架包括:Hadoop(HDFS+MapReduce+Yarn)+Flume+KafKa+Hbase+Hive+Spark(SQL、Structured Streaming )+Hue+Mysql+SpringMVC+Mybatis+Websocket+AugularJs+Echarts。所涉及到的語言包括:JAVA、Scala、Shell。
由於本文並非零基礎教學,所以只講架構和流程,基礎性知識自行查缺補漏。Github已經上傳完整專案程式碼

:liuyanling41-Github

最終效果圖如下:

view

專案架構圖如下:

_

環境準備

在這裡插入圖片描述

**

模擬網站實時產生日誌資訊

**

  • 獲取資料來源,本文是利用搜狗的資料:搜狗實驗室

  • 編寫java類模擬實時採集網站日誌。主要利用Java中的輸入輸出流。寫好後打成jar包傳到伺服器上

    public class ReadWebLog {

     private static String readFileName;
     private static String writeFileName;
    
     public static void main(String args[]) {
         readFileName = args[0];
         writeFileName = args[1];
         readFile(readFileName);
    
     }
    
     public static void readFile(String fileName) {
    
         try {
             FileInputStream fis = new FileInputStream(fileName);
             InputStreamReader isr = new InputStreamReader(fis, "GBK");
             //以上兩步已經可以從檔案中讀取到一個字元了,但每次只讀取一個字元不能滿足大資料的需求。故需使用BufferedReader,它具有緩衝的作用,可以一次讀取多個字元
             BufferedReader br = new BufferedReader(isr);
             int count = 0;
             while (br.readLine() != null) {
                 String line = br.readLine();
                 count++;
                 // 顯示行號
                 Thread.sleep(300);
                 String str = new String(line.getBytes("UTF8"), "GBK");
                 System.out.println("row:" + count + ">>>>>>>>" + line);
                 writeFile(writeFileName, line);
             }
             isr.close();
         } catch (IOException e) {
             e.printStackTrace();
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
     }
    
    
     public static void writeFile(String fileName, String conent) {
         try {
             FileOutputStream fos = new FileOutputStream(fileName, true);
             OutputStreamWriter osw = new OutputStreamWriter(fos);
        
             BufferedWriter bw = new BufferedWriter(osw);
             bw.write("\n");
             bw.write(conent);
             bw.close();
         } catch (UnsupportedEncodingException e) {
             e.printStackTrace();
         } catch (FileNotFoundException e) {
             e.printStackTrace();
         } catch (IOException e) {
             e.printStackTrace();
         }
     }
    

    }

image

- 編寫採集日誌的shell指令碼

vim weblog.sh
#/bin/bash
echo "start log"
java -jar /home/weblog.jar /usr/local/weblog.log /home/weblogs.log
  • 執行效果圖

在這裡插入圖片描述

Flume Agent2採集日誌資訊

主要通過設定Source、Channel、Sink來完成日誌採集。

配置flume配置檔案 vim agent2.conf

a2.sources = r2
a2.channels = c2
a2.sinks = k2

a2.sources.r2.type = exec
#來源於weblogs.log檔案
a2.sources.r2.command = tail -F /home/weblogs.log
a2.sources.r2.channels = c2

a2.channels.c2.type = memory
a2.channels.c2.capacity = 10000
a2.channels.c2.transactionCapacity = 100
a2.channels.c2.keep-alive = 10

a2.sinks.k2.type = avro
a2.sinks.k2.channel = c2

 落地點是master機器的5555埠(主機名和埠號都必須與master機器的flume配置保持一致)
a2.sinks.k2.hostname = master
a2.sinks.k2.port = 5555
  • 編寫shell指令碼,方便執行。vim flume.sh

#/bin/bash
echo “flume agent2 start”
bin/flume-ng agent --conf conf --name a2 --conf-file conf/agent2.conf -Dflume.root.logger=INFO,console

  • 執行的時候直接 ./flume.sh 即可

Flume Agent3採集日誌資訊

各方面配置都和Agent2完全一樣、省略。

Flume Agent1整合日誌資訊

  • vim agent1.conf

#Flume Agent1實時整合日誌資訊

a1.sources = r1
a1.channels = kafkaC hbaseC
a1.sinks = kafkaS hbaseS

flume + hbase
a1.sources.r1.type = avro
a1.sources.r1.channels = kafkaC hbaseC
a1.sources.r1.bind = master
a1.sources.r1.port = 5555

a1.channels.hbaseC.type = memory
a1.channels.hbaseC.capacity = 10000
a1.channels.hbaseC.transactionCapacity = 10000

a1.sinks.hbaseS.type = asynchbase
a1.sinks.hbaseS.table = weblogs
a1.sinks.hbaseS.columnFamily = info
a1.sinks.hbaseS.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.hbaseS.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl
a1.sinks.hbaseS.channel = hbaseC

flume + kafka
a1.channels.kafkaC.type = memory
a1.channels.kafkaC.capacity = 10000
a1.channels.kafkaC.transactionCapacity = 10000

a1.sinks.kafkaS.channel = kafkaC
a1.sinks.kafkaS.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.kafkaS.topic = weblogs
a1.sinks.kafkaS.brokerList = master:9092,slave1:9092,slave2:9092
a1.sinks.kafkaS.zookeeperConnect = master:2181,slave1:2181,slave2:2181
a1.sinks.kafkaS.requiredAcks = 1
a1.sinks.kafkaS.batchSize = 20
a1.sinks.kafkaS.serializer.class = kafka.serializer.StringEncoder

  • vim flume.sh

    #/bin/bash
    echo “flume agent1 start”
    bin/flume-ng agent --conf conf --name a1 --conf-file conf/agent1.conf -Dflume.root.logger=INFO,console

具體講解如下:

Flume與Hbase的整合

  • 通過檢視官方文件可知,Flume與Hbase的整合主要需要如下引數,表名、列簇名、以及Java類SimpleAsyncHbaseEventSerializer。

在這裡插入圖片描述

  • 改寫SimpleAsyncHbaseEventSerializer
    下載Flume原始碼,需要改寫如下兩個Java類.

Flume原始碼imageimage
在這裡插入圖片描述

  • 打成jar包,上傳到linux伺服器中替換原有flume目錄的該jar包

imageimage
在這裡插入圖片描述

  • Flume配置檔案配置Sink為Hbase

    a1.sinks.hbaseS.type = asynchbase
    a1.sinks.hbaseS.table = weblogs
    a1.sinks.hbaseS.columnFamily = info
    a1.sinks.hbaseS.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
    a1.sinks.hbaseS.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl
    a1.sinks.hbaseS.channel = hbaseC

Flume與Kafka的整合

  • Flume配置檔案:主要配置topic、brokerlist:

image

a1.sinks.kafkaS.channel = kafkaC
a1.sinks.kafkaS.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.kafkaS.topic = weblogs
a1.sinks.kafkaS.brokerList = master:9092,slave1:9092,slave2:9092
a1.sinks.kafkaS.zookeeperConnect = master:2181,slave1:2181,slave2:2181
a1.sinks.kafkaS.requiredAcks = 1
a1.sinks.kafkaS.batchSize = 20
a1.sinks.kafkaS.serializer.class = kafka.serializer.StringEncoder

  • 編寫kafka消費端指令碼,消費從flume傳過來的資訊。

    vim flume.sh

    #/bin/bash
    echo “flume agent1 start”
    bin/kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic weblogs --from-beginning

執行效果圖
kafka_flume_

Kafka與Spark整合完成資料實時處理

這裡我選擇的是2.2版本中的StructuredStreaming,因為它相比SparkStreaming而言有很多優勢,它的出現重點就是解決端到端的精確一次語義,保證資料的不丟失不重複,這對於流式計算極為重要。StructuredStreaming的輸入源為kafka,spark對來自kafka的資料進行計算,主要就是累加話題量和訪問量。具體程式碼參考github。

val spark = SparkSession.builder()


 .master("local[2]")
  .appName("streaming").getOrCreate()

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "master:9092")
  .option("subscribe", "weblogs")
  .load()

import spark.implicits._
val lines = df.selectExpr("CAST(value AS STRING)").as[String]
val weblog = lines.map(_.split(",")).map(x => Weblog(x(0), x(1), x(2), x(3), x(4), x(5)))
val titleCount = weblog.groupBy("searchname").count().toDF("titleName", "webcount")

Spark與Mysql整合

這裡選擇Mysql是因為,我們的需求只是報表展示,需要在前臺展示的欄位並不多,關係型資料庫完全能夠支撐。在Hbase裡有幾百萬條資料(一個瀏覽話題可能有十幾萬人搜尋過,也就是說一個話題就有十幾萬條資料,這麼大量資料當然要存在Hbase中),而經過spark的計算,這十幾萬條資料在mysql中就變成了一條資料(XXX話題,XXX瀏覽量)。
如果業務需求變了,我需要實時查詢使用者各種資訊(資料量很大,欄位很多),那麼當然就是實時的直接從Hbase裡查,而不會在Mysql中。
所以企業中要根據不同的業務需求,充分考慮資料量等問題,進行架構的選擇。

val url = "jdbc:mysql://master:3306/weblog?useSSL=false"
val username = "root"
val password = "123456"

val writer = new JdbcSink(url, username, password)
val weblogcount = titleCount.writeStream
  .foreach(writer)
  .outputMode("update")
  .start()

weblogcount.awaitTermination()

離線分析:HIVE整合HBASE。

我們知道Hive是一個數據倉庫,主要就是轉為MapReduce完成對大量資料的離線分析和決策。之前我們已經用Flume整合Hbase,使得Hbase能源源不斷的插入資料。那麼我們直接將HIVE整合HBase,這樣只要Hbase有資料了,那Hive表也就有資料了。怎麼整合呢?很簡單,用【外部表】就搞定了。

CREATE EXTERNAL TABLE `weblogs`(
  `id` string COMMENT 'from deserializer', 
  `datatime` string COMMENT 'from deserializer', 
  `userid` string COMMENT 'from deserializer', 
  `searchname` string COMMENT 'from deserializer', 
  `retorder` string COMMENT 'from deserializer', 
  `cliorder` string COMMENT 'from deserializer', 
  `cliurl` string COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.hbase.HBaseSerDe' 
STORED BY 
  'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
WITH SERDEPROPERTIES ( 
  'hbase.columns.mapping'=':key,info:datatime,info:userid,info:searchname,info:retorder,info:cliorder,info:cliurl', 
  'serialization.format'='1')
TBLPROPERTIES (
  'COLUMN_STATS_ACCURATE'='false', 
  'hbase.table.name'='weblogs', 
  'numFiles'='0', 
  'numRows'='-1', 
  'rawDataSize'='-1', 
  'totalSize'='0', 
  'transient_lastDdlTime'='1518778031')

驗證一下HBASE和HIVE是不是同步的:

image
image

好了現在我們可以在Hive中盡情的離線分析和決策了~~~

SpringMVC+Mybatis完成對mysql資料的查詢

個人覺得傳統JDBC實在是太笨重,還是最喜歡Spring整合Mybatis對資料庫進行操作。這裡主要完成的操作就是對mysql的資料進行查詢。詳情請參考github,地址文章開頭已給出。
image

WebSocket實現全雙工通訊

既然要實現客戶端實時接收伺服器端的訊息,而伺服器端又實時接收客戶端的訊息,必不可少的就是WebSocket了,WebSocket實現了瀏覽器與伺服器全雙工通訊(full-duple),能更好的節省伺服器資源和頻寬並達到實時通訊。WebSocket用HTTP握手之後,伺服器和瀏覽器就使用這條HTTP連結下的TCP連線來直接傳輸資料,拋棄了複雜的HTTP頭部和格式。一旦WebSocket通訊連線建立成功,就可以在全雙工模式下在客戶端和伺服器之間來回傳送WebSocket訊息。即在同一時間、任何方向,都可以全雙工傳送訊息。WebSocket 核心就是OnMessage、OnOpen、OnClose,本專案使用的是和Spring整合的方式,因此需要有configurator = SpringConfigurator.class。

@ServerEndpoint(value = "/websocket", configurator = SpringConfigurator.class)
public class WebSocket {
    @Autowired
    private WebLogService webLogService;
    @OnMessage
    public void onMessage(String message, Session session) throws IOException, InterruptedException {
        String[] titleNames = new String[10];
        Long[] titleCounts = new Long[10];
        Long[] titleSum = new Long[1];
        while (true) {
            Map<String, Object> map = new HashMap<String, Object>();
            List<WebLogBO> list = webLogService.webcount();
            System.out.print(list);
            for (int i = 0; i < list.size(); i++) {
                titleNames[i] = list.get(i).getTitleName();
                titleCounts[i] = list.get(i).getWebcount();
            }
            titleSum[0] = webLogService.websum();
            map.put("titleName", titleNames);
            map.put("titleCount", titleCounts);
            map.put("titleSum", titleSum);
            System.out.print(map);
            session.getBasicRemote().sendText(JSON.toJSONString(map));
            Thread.sleep(1000);
            map.clear();
        }
    }

    @OnOpen
    public void onOpen() {
        System.out.println("Client connected");
    }

    @OnClose
    public void onClose() {
        System.out.println("Connection closed");
    }
}

Echarts完成前端介面展示

大家可以看到開頭給出的專案效果圖還是蠻漂亮的,其實非常簡單,就是用的Echarts這個框架。直接給它傳值就ok了,其他前端那些事它都給你搞定了。詳情請參考github,地址文章開頭已給出。

    function webcount(json) {
        var option = {
            title: {
                text: '搜狗新聞熱點實時統計',
                subtext: '作者:劉彥伶'
            },
            tooltip: {
                trigger: 'axis',
                axisPointer: {
                    type: 'shadow'
                }
            },
            legend: {
                data: ['瀏覽量']
            },
            grid: {
                left: '3%',
                right: '4%',
                bottom: '3%',
                containLabel: true
            },
            xAxis: {
                type: 'value',
                boundaryGap: [0, 0.01]
            },
            yAxis: {
                type: 'category',
                data: json.titleName
            },
            series: [
                {
                    name: '瀏覽量',
                    type: 'bar',
                    data: json.titleCount
                },

            ]
        };
        countchart.setOption(option);
    }

本文講解的比較粗糙,有很多細節的東西,畢竟一整個專案不可能用一篇文章說清楚。。。所以實踐的東西需要讀者自己去領悟,但是架構、環境搭建、方法、流程還是很有參考價值的!

原文出至:https://yq.aliyun.com/articles/557454