1. 程式人生 > >大資料專案實戰之新聞話題統計分析

大資料專案實戰之新聞話題統計分析

前言:本文是一個完整的大資料專案實戰,實時|離線統計分析使用者的搜尋話題,並用JavaEE工程前端介面展示出來。這些指標對網站的精準營銷、運營都有極大幫助。架構大致是按照企業標準來的,從日誌的採集、轉化處理、實時計算、JAVA後臺開發、WEB前端展示,一條完整流程線下來,甚至每個節點都用的高可用架構,都考慮了故障轉移和容錯性。所用到的框架包括 :Hadoop(HDFS+MapReduce+Yarn)+Flume+KafKa+Hbase+Hive+Spark(SQL、Streaming )+Hive+Mysql+SpringMVC+Mybatis+Websocket+AugularJs+Echarts

。所涉及到的語言包括:JAVA、Scala、Shell 

專案部署地址:http://120.79.35.74:443/Hot_News_Web/

專案原始碼聯絡郵箱:[email protected]

專案架構圖:

 

     一:資料來源處理(搜狗實驗室獲取新聞資源 XML——>TXT:java解析大批量xml檔案 程式碼後貼)

 

       處理思路:利用SAXReader獲取xml檔案內容,並構建News實體類以便寫入txt檔案,然後編寫ReadWebLog類並編寫指令碼執行在Liunx上模擬新聞搜尋日誌產生

       Liunx執行jar命令:java -jar 你的上傳jar包所在目錄  args0 args1

       或Shell指令碼命令:

#/bin/bash

echo "start log"

java -jar 你的上傳jar包所在目錄  args0 args1

 

程式碼:

處理搜狗實驗室元資料.xml----->txt

package cn.yusys.hotnews.datasource;

 

import org.dom4j.Document;

import org.dom4j.Element;

import org.dom4j.io.SAXReader;

 

import java.io.BufferedWriter;

import java.io.File;

import java.io.FileWriter;

import java.io.IOException;

import java.text.SimpleDateFormat;

import java.util.ArrayList;

import java.util.Date;

import java.util.Iterator;

import java.util.List;

 

/**

* 解析搜狗實驗室新聞xml檔案為txt檔案---->專案資料來源

* @author Tangzhi mail:[email protected]

* Created on 2018年11月12日

*/

public class MyXMLReader2JDOM {

    public static void main(String[] args) {

        // 獲取xml檔案讀取流

        SAXReader reader = new SAXReader();

        // 設定字符集編碼方式

        reader.setEncoding("utf-8");

        Document document;

        Element rootElement;

        List<Element> docList;

        Iterator<Element> iterator;

        // 用於存放節點資料以便後面的寫入之news.log

        ArrayList<News> list = new ArrayList<News>();

        // 開始進行讀取

        try {

            document = reader.read(new File("D:\\Downloads\\大資料資料來源\\news_tensite_xml.smarty.dat"));

            // 得到根節點元素 <docs>...</docs>

            rootElement = document.getRootElement();

            //<doc>...<doc>

            docList = rootElement.elements("doc");

         /*

          * 得到xml具體配置檔案資訊

          */

            iterator = docList.iterator();

            for (Element e : docList) {

                News news = new News();

                /**

                 * 遍歷子節點將具體新聞資訊寫入txt檔案

                 */

                if (e.element("url") != null && !" ".equals(e.element("url"))) {

                    news.setUrl(e.element("url").getStringValue().trim());

                }

                if (e.element("docno") != null && !" ".equals(e.element("docno"))) {

                    news.setDocno(e.element("docno").getStringValue().trim());

                }

                if (e.element("contenttitle") != null && !" ".equals(e.element("contenttitle"))) {

                    news.setContenttitle(e.element("contenttitle").getStringValue().trim());

                }

                if (e.element("content") != null && !" ".equals(e.element("content"))) {

                    news.setContent(e.element("content").getStringValue().trim());

                }

                list.add(news);

            }

            /**

             * 進行寫入txt檔案

             */

            writwToFile(list);

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

    /**

     * 寫入txt檔案(後期當源資料檔案過大時進行分片處理)

     * @throws IOException

     */

    public static void writwToFile(List<News> list) throws IOException {

        File file = new File("D:\\Downloads\\大資料資料來源\\news2.log");

        BufferedWriter bw = new BufferedWriter(new FileWriter(file));

        if (!file.exists()) {

            try {

                file.createNewFile();

            } catch (IOException e) {

                e.printStackTrace();

            }

        } else {

            for (News news : list) {

                Date date = new Date();

                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

                String dateStr = sdf.format(date);

                bw.write("datetime"+"="+dateStr+"|");

                bw.write("url"+"="+news.getUrl()+"|");

                bw.write("docno"+"="+news.getDocno()+"|");

                bw.write("contenttitle"+"="+news.getContenttitle()+"|");

                bw.write("content"+"="+news.getContent());

                bw.write("\n");

                bw.flush();

            }

        }

    }

}

----------------------------------------------------------------------------------------------------------------

----------------------------------------------------------------------------------------------------------------

package cn.yusys.hotnews.datasource;

/**

*xml解析時新聞實體類

*/

public class News implements Serializable{

     // 實現序列化介面以便多臺機器同時解析

   public News () {

       

   }

   

   public News(String url, String docno, String contenttitle, String content) {

    super();

    this.url = url;

    this.docno = docno;

    this.contenttitle = contenttitle;

    this.content = content;

}

 

String url;

   String docno;

   String contenttitle;

   String content;

public String getUrl() {

    return url;

}

public void setUrl(String url) {

    this.url = url;

}

public String getDocno() {

    return docno;

}

public void setDocno(String docno) {

    this.docno = docno;

}

public String getContenttitle() {

    return contenttitle;

}

public void setContenttitle(String contenttitle) {

    this.contenttitle = contenttitle;

}

public String getContent() {

    return content;

}

public void setContent(String content) {

    this.content = content;

}

   

}

-----------------------------------------------------------------------------------------

-----------------------------------------------------------------------------------------

執行在Liunx上模擬日誌產生並通過flume採集

package cn.yusys.hotnews.datasource;

import java.io.*;

/**

* 模擬日誌伺服器產生日(從news.log/news1.log中隨機切換檔案讀取資料然後寫入日誌檔案-----》然後使用進行flume採集)

* @author Tangzhi mail:[email protected]

* @date 2018年11月12日

*/

public class ReadWebLog {

    public static String readFileName;

    public static String writeFileName;

    public static void main (String[] args) {

         readFileName = args[0];

         writeFileName = args[1];

         readFile(readFileName);

    }

    /**

     * 從new.log/news1.log中隨機讀取日誌資訊

     */

    public static void readFile(String fileName){

        try {

            FileInputStream fs = new FileInputStream(fileName);

            // 轉換流

            InputStreamReader isr = new InputStreamReader(fs,"utf-8");

            BufferedReader br = new BufferedReader(isr);

            int count = 0;

            while (br.readLine() != null){

                String line = br.readLine();

                count ++;

                // 自定義讀取間隔毫秒

                Thread.sleep(1000);

                System.out.println("row:" + count + ">>>>>>>>" + line);

                /**

                 * 寫入到指定檔案中(與flume配置檔案對應)

                 */

                writeFile(writeFileName,line);

            }

        } catch (Exception e) {

            e.printStackTrace();

        }

 

    }

    /**

     * 檔案內容的寫入

     */

    public static void writeFile (String fileName,String line) {

        try {

            FileOutputStream fs = new FileOutputStream(fileName, true);

            OutputStreamWriter osw = new OutputStreamWriter(fs);

            BufferedWriter bw = new BufferedWriter(osw);

            // 執行檔案內容的寫入

            bw.write(line);

            bw.write("\n");

            bw.close();

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

}

 

 

Q&A

Q1:

Java異常: "2 位元組的 UTF-8 序列的位元組 2 無效。"  

A1:利用記事本開啟 另行儲存編碼格式為UTF-8 再Notepad++(其他編輯器亦可)用開啟即可

Q2 :

在Liunx系統上執行jar時出現找不到主類

A1 :使用IDEA時pom.xml加入以下依賴並在<mainClass></mainClass>部分寫入你類全路徑

<build>

        <plugins>

            <plugin>

                <groupId>org.apache.maven.plugins</groupId>

                <artifactId>maven-shade-plugin</artifactId>

                <version>2.4.3</version>

                <executions>

                    <execution>

                        <phase>package</phase>

                        <goals>

                            <goal>shade</goal>

                        </goals>

                        <configuration>

                            <filters>

                                <filter>

                                    <artifact>*:*</artifact>

                                    <excludes>

                                        <exclude>META-INF/*.SF</exclude>

                                        <exclude>META-INF/*.DSA</exclude>

                                        <exclude>META-INF/*.RSA</exclude>

                                    </excludes>

                                </filter>

                            </filters>

                            <transformers>

                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">

                                    <mainClass></mainClass>

                                </transformer>

                            </transformers>

                        </configuration>

                    </execution>

                </executions>

            </plugin>

        </plugins>

    </build>

Liunx效果圖:

附 :當資料來源檔案很大時,可以在ReadWebLog類中進行隨機讀取多個log檔案設定也可是模擬資料更為真實

      搜狗實驗室:中文資訊處理以及部分資料提供  http://www.sogou.com/labs/

注:根據專案需求進行資料前期清洗

到此:資料來源獲取 簡單清洗(uid設定  時間戳繫結 部分資料過濾)已經完成

 

 

     二 :Flume與HBase、Kafka資料對接

處理思路:Flume採集資料來源資料與HBase整合----->HBase與Hive對接------>Hadoop MapReduce 完成離線計算分析----->前臺Echart

                 Flume採集資料來源資料與Kafka整合----->Kafka與Spark Streaming對接----->Spark + MySql 完成實時計算分析----->前臺Echart

 

1.Flume與HBase

    Tips:此時Flume Sink 下沉目標為HBase (前提:Liunx環境安裝HBase,理解HBase原理)修改hbase-site.xml 修改hbase資料目錄 zookeeper地址

               HBase常用Shell命令:

                                            啟動HBase: strat-hbase.sh

                                            Shell命令互動模式:./hbase shell

                                                     建立表 : create '表名', '列族名1','列族名2','列族名N'.......

                 HBase配置檔案下圖:

 

##hbase-env.sh

export JAVA_HOME=/opt/jdk1.7.0_65   ----自己虛擬機器jdk路徑(etc/profile)

export JAVA_CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export HBASE_MANAGES_ZK=false ----不使用HBase自帶zookeeper使用外部zookeeper叢集

 

##hbase-site.xml

<configuration>

<property>

<name>hbase.master</name>

<value>192.168.25.136:60000</value>

</property>

<property>

<name>hbase.master.maxclockskew</name>

<value>180000</value>

</property>

<property>

<name>hbase.rootdir</name>

<value>hdfs://192.168.25.136:9000/hbase</value>

</property>

<property>

<name>hbase.cluster.distributed</name>

<value>true</value>

</property>

<property>

<name>hbase.zookeeper.quorum</name>

<value>192.168.25.136</value>

</property>

<property>

<name>hbase.zookeeper.property.dataDir</name>

<value>/tmp/hbase_data</value>

</property>

</configuration>

 

##regionservers

localhost ---填寫叢集IP地址或主機名

 

Q&A

Q1:建立HBase表時報錯: ERROR:Can not get address from Zookeeper; znode data == null

A1:1、確保zookeeper叢集可用 Hadoop叢集可用 HBase服務正常啟動

        2、vi hbase-site.xml 檢視HBase資料存放目錄許可權是否為可讀可寫  

 

Q2:HBase0.9.4 通過指令碼啟動後建立表時報錯  

ERROR: org.apache.hadoop.hbase.MasterNotRunningException: Retried 7 times

A2 : 

 

   1.檢視hadoop叢集 zookeeper叢集是否可用

   2.HBase 0.9.4 與 Hadoop 2.x 相容性差 版本不一致 導致無法進行RPC通訊 建議搭配:HBase 0.9.9.x + Hadoop 2.x但請注意此時

      Flume最好選擇1.7.x 原因見下圖:

 

 

1.Flume與Kafka

    Tips:此時Flume Sink 下沉目標為Kafka(前提:Liunx環境安裝Kafka,理解Kafka原理) 

               Kafka常用Shell命令:

                                          啟動kafka: bin/kafka-servers-start   ../config/server.properties  (指定特定檔案啟動)

                                          建立主題:bin/kafka-topics.sh --create --zookeeper 192.168.25.136:2181 --replication-factor 1 --partitions 1 --topic weblogs

                                          刪除主題:bin/kafka-topics.sh --delete --zookeeper 192.168.25.136:2181 --topic weblogs

                                          控制檯消費topic的資料:bin/kafka-console-consumer.sh --zookeeper 192.168.25.136:2181 --topic weblogs  --from-beginning

                                          控制檯生產資料:bin/kafka-console-producer.sh --broker-list 192.168.25.136:9092 --topic weblogs

                                          檢視主題具體資訊:bin/kafka-topics.sh --zookeeper 192.168.25.136:2181 --describe --topic weblogs

           Kafka配置檔案 (kafka在大資料專案中大多作為資料緩衝區  生產者-消費者模式)

 

#broker的全域性唯一編號,不能重複

broker.id=0

 

#用來監聽連結的埠,producer或consumer將在此埠建立連線

port=9092

 

#處理網路請求的執行緒數量

num.network.threads=3

 

#用來處理磁碟IO的執行緒數量

num.io.threads=8

 

#傳送套接字的緩衝區大小

socket.send.buffer.bytes=102400

 

#接受套接字的緩衝區大小

socket.receive.buffer.bytes=102400

 

#請求套接字的緩衝區大小

socket.request.max.bytes=104857600

 

#kafka執行日誌存放的路徑

log.dirs=/export/logs/kafka

 

#topic在當前broker上的分片個數

num.partitions=2

 

#用來恢復和清理data下資料的執行緒數量

num.recovery.threads.per.data.dir=1

 

#segment檔案保留的最長時間,超時將被刪除

log.retention.hours=168

 

#滾動生成新的segment檔案的最大時間

log.roll.hours=168

 

#日誌檔案中每個segment的大小,預設為1G

log.segment.bytes=1073741824

 

#週期性檢查檔案大小的時間

log.retention.check.interval.ms=300000

 

#日誌清理是否開啟

log.cleaner.enable=true

 

#broker需要使用zookeeper儲存meta資料

zookeeper.connect=192.168.25.136:2181,192.168.25.136:2182,192.168.25.136:2183

 

#zookeeper連結超時時間

zookeeper.connection.timeout.ms=6000

 

#partion buffer中,訊息的條數達到閾值,將觸發flush到磁碟

log.flush.interval.messages=10000

 

#訊息buffer的時間,達到閾值,將觸發flush到磁碟

log.flush.interval.ms=3000

 

#刪除topic需要server.properties中設定delete.topic.enable=true否則只是標記刪除

delete.topic.enable=true

 

#此處的host.name為本機IP(重要),如果不改,則客戶端會丟擲:Producer connection to localhost:9092 unsuccessful 錯誤!

host.name=192.168.25.136

        

  當kafka消費的資料與HBase的weblogs表記錄總數相等時說明已完成Flume與HBase、Kafka資料對接

   Flume啟動日誌圖:

HBase count表中資料:

Kafka消費資料:

 

附:Flume採集資料按特定列下沉至Hbase、Kafka配置檔案(重點)

a1.sources = r1

a1.channels = kafkaC hbaseC

a1.sinks = kafkaS hbaseS

 

a1.sources.r1.type = exec  

a1.sources.r1.command = tail -F /home/hotnews/weblogs.log

a1.sources.r1.channels = kafkaC hbaseC

 

# flume + hbase

# sink 配置為HBaseSink 和 SimpleHbaseEventSerializer

a1.channels.hbaseC.type = memory

a1.channels.hbaseC.capacity = 10000

a1.channels.hbaseC.transactionCapacity = 10000

#HBase表名

a1.sinks.hbaseS.type = org.apache.flume.sink.hbase.HBaseSink

a1.sinks.hbaseS.table = weblogs

#HBase表的列族名稱

a1.sinks.hbaseS.columnFamily  = info

a1.sinks.hbaseS.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

#正則匹配新聞資料去到對應的列族下的對應列(xxxx|xxxx|xxxx|xxxx|xxxx)

a1.sinks.hbaseS.serializer.regex = ^(.*)\\|(.*)\\|(.*)\\|(.*)\\|(.*)

a1.sinks.hbaseS.serializer.colNames =datatime,url,docno,contenttitle,content

#a1.sinks.hbaseS.serializer.rowKeyIndex = 0   

# 組合sink和channel

a1.sinks.hbaseS.channel = hbaseC

 

# flume + kafka

a1.channels.kafkaC.type = memory

a1.channels.kafkaC.capacity = 10000

a1.channels.kafkaC.transactionCapacity = 10000

 

a1.sinks.kafkaS.channel = kafkaC

a1.sinks.kafkaS.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.kafkaS.topic = weblogs

a1.sinks.kafkaS.brokerList = 192.168.25.136:9092

a1.sinks.kafkaS.zookeeperConnect = 192.168.25.136:2181

a1.sinks.kafkaS.requiredAcks = 1

a1.sinks.kafkaS.batchSize = 20

a1.sinks.kafkaS.serializer.class = kafka.serializer.StringEncoder

 

 

三 :Kafka+Spark Streaming +MySql 實時計算分析

1、資料庫連線池編寫(Java原生版 + Scala c3p0版)

      附:MySql部署在Liunx,連線資訊配置在db.properties

Java原生版:

package cn.yuysy.hotnews.realtime.db;

 

import java.io.File;

import java.io.FileInputStream;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.SQLException;

import java.util.LinkedList;

import java.util.Properties;

 

/**

* 資料庫連線池

* Created on 2018-11-15

* @author @author tangzhi mail:[email protected]

*/

public class ConnectionPool {

           private static LinkedList<Connection> connectionQueue;

           private static Properties prop ;

 

    /**

     * 驅動類

     */

    static {

        try {

            prop  = new Properties();

            prop.load(new FileInputStream(new File("C:\\Users\\Administrator\\Hot_News\\src\\main\\scala\\cn\\yuysy\\hotnews\\realtime\\db\\db.properties")));

            Class.forName(prop.getProperty("driverName").toString());

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

    /**

     * 獲取連線物件

     */

    public synchronized Connection getConnection () {

        if (connectionQueue == null || connectionQueue.size() == 0) {

            connectionQueue = new LinkedList<Connection>();

            for (int i = 0;i < 5;i ++) {

                try {

                    Connection connection = DriverManager.getConnection(prop.getProperty("url").toString(), prop.getProperty("username").toString(), prop.getProperty("password").toString());

                    connectionQueue.add(connection);

                } catch (SQLException e) {

                    e.printStackTrace();

                }

            }

        }

        return connectionQueue.poll();

    }

    /**

     * 歸還連線至連線池

     */

    public void returnConnection(Connection connection) {

        connectionQueue.add(connection);

    }

}

 

Scala c3p0版:

 

package cn.yuysy.hotnews.realtime.db

 

import java.io.{File, FileInputStream, InputStream}

import java.sql.Connection

import java.util.Properties

 

import com.mchange.v2.c3p0.ComboPooledDataSource

import org.apache.spark.SparkFiles

 

/**

  * C3P0資料庫連線池

  * Created on 2018-11-15

  * @author tangzhi mail:[email protected]

  */

class c3p0ConnectionPool(isLocal: Boolean) extends Serializable {

     private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true)

     private val prop = new Properties()

     private var in: InputStream = _

     isLocal match {

       case true => in = getClass.getResourceAsStream("db.properties")  //本地IDEA模式

       case false => in = new FileInputStream(new File(SparkFiles.get("db.properties"))) //Liunx spark叢集模式

     }

 

  /**

    * 註冊連線

    * @return

    */

    try {

      prop.load(in);

      cpds.setJdbcUrl(prop.getProperty("url").toString())

      cpds.setDriverClass(prop.getProperty("driverName").toString())

      cpds.setUser(prop.getProperty("username").toString())

      cpds.setPassword(prop.getProperty("password").toString())

      cpds.setMaxPoolSize(Integer.valueOf(prop.getProperty("maxPoolSize").toString()));

      cpds.setMinPoolSize(Integer.valueOf(prop.getProperty("minPoolSize").toString()));

      cpds.setAcquireIncrement(Integer.valueOf(prop.getProperty("acquireIncrement").toString()))

    } catch {

      case ex: Exception => ex.printStackTrace()

    }

  def getConnection: Connection={

         try {

           cpds.getConnection()

         } catch {

           case ex: Exception => ex.printStackTrace()

             null

         }

     }

   object c3p0ConnectionPool{

     var  connectionPool: c3p0ConnectionPool = _

     def getc3p0ConnectionPool(isLocal: Boolean): c3p0ConnectionPool = {

           synchronized {

              if (connectionPool == null) {

                connectionPool = new c3p0ConnectionPool(isLocal)

              }

           }

       connectionPool

     }

   }

}

 

Q&A

Q1:本地執行spark streaming 程式讀取kafka資料報錯:

Exception in thread "main" java.lang.AssertionError: assertion failed: No output streams registered, so nothing to execute at scala

A1:spark streaming 運算元執行沒有觸發Action 以下為常見action

Q2:spark streamign 實時計算處理後的資料寫入MySqL亂碼

A2:在資料庫連線配置檔案的URL後新增: ?useUnicode=true&characterEncoding=utf8即可

2、實時分析思路 + 部分程式碼

實時分析思路

       從kafka讀取資料後(_._2)----->新聞資料------>先將value對映為Map[String,String]----->切割、根據key分組、聚合----->根據key值寫sql------>寫入MySql成功

部分程式碼:

 

package cn.yuysy.hotnews.realtime

 

import java.io.{File, FileInputStream}

import java.sql.{Connection, Statement}

import java.util.Properties

 

import cn.yuysy.hotnews.realtime.db.c3p0ConnectionPool

import org.apache.spark.{HashPartitioner, SparkConf}

import org.apache.spark.storage.