大資料專案實戰之新聞話題統計分析
前言:本文是一個完整的大資料專案實戰,實時|離線統計分析使用者的搜尋話題,並用JavaEE工程前端介面展示出來。這些指標對網站的精準營銷、運營都有極大幫助。架構大致是按照企業標準來的,從日誌的採集、轉化處理、實時計算、JAVA後臺開發、WEB前端展示,一條完整流程線下來,甚至每個節點都用的高可用架構,都考慮了故障轉移和容錯性。所用到的框架包括 :Hadoop(HDFS+MapReduce+Yarn)+Flume+KafKa+Hbase+Hive+Spark(SQL、Streaming )+Hive+Mysql+SpringMVC+Mybatis+Websocket+AugularJs+Echarts
專案部署地址:http://120.79.35.74:443/Hot_News_Web/
專案原始碼聯絡郵箱:[email protected]
專案架構圖:
一:資料來源處理(搜狗實驗室獲取新聞資源 XML——>TXT:java解析大批量xml檔案 程式碼後貼)
處理思路:利用SAXReader獲取xml檔案內容,並構建News實體類以便寫入txt檔案,然後編寫ReadWebLog類並編寫指令碼執行在Liunx上模擬新聞搜尋日誌產生
Liunx執行jar命令:java -jar 你的上傳jar包所在目錄 args0 args1
或Shell指令碼命令:
#/bin/bash
echo "start log"
java -jar 你的上傳jar包所在目錄 args0 args1
程式碼:
處理搜狗實驗室元資料.xml----->txt
package cn.yusys.hotnews.datasource;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
/**
* 解析搜狗實驗室新聞xml檔案為txt檔案---->專案資料來源
* @author Tangzhi mail:[email protected]
* Created on 2018年11月12日
*/
public class MyXMLReader2JDOM {
public static void main(String[] args) {
// 獲取xml檔案讀取流
SAXReader reader = new SAXReader();
// 設定字符集編碼方式
reader.setEncoding("utf-8");
Document document;
Element rootElement;
List<Element> docList;
Iterator<Element> iterator;
// 用於存放節點資料以便後面的寫入之news.log
ArrayList<News> list = new ArrayList<News>();
// 開始進行讀取
try {
document = reader.read(new File("D:\\Downloads\\大資料資料來源\\news_tensite_xml.smarty.dat"));
// 得到根節點元素 <docs>...</docs>
rootElement = document.getRootElement();
//<doc>...<doc>
docList = rootElement.elements("doc");
/*
* 得到xml具體配置檔案資訊
*/
iterator = docList.iterator();
for (Element e : docList) {
News news = new News();
/**
* 遍歷子節點將具體新聞資訊寫入txt檔案
*/
if (e.element("url") != null && !" ".equals(e.element("url"))) {
news.setUrl(e.element("url").getStringValue().trim());
}
if (e.element("docno") != null && !" ".equals(e.element("docno"))) {
news.setDocno(e.element("docno").getStringValue().trim());
}
if (e.element("contenttitle") != null && !" ".equals(e.element("contenttitle"))) {
news.setContenttitle(e.element("contenttitle").getStringValue().trim());
}
if (e.element("content") != null && !" ".equals(e.element("content"))) {
news.setContent(e.element("content").getStringValue().trim());
}
list.add(news);
}
/**
* 進行寫入txt檔案
*/
writwToFile(list);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 寫入txt檔案(後期當源資料檔案過大時進行分片處理)
* @throws IOException
*/
public static void writwToFile(List<News> list) throws IOException {
File file = new File("D:\\Downloads\\大資料資料來源\\news2.log");
BufferedWriter bw = new BufferedWriter(new FileWriter(file));
if (!file.exists()) {
try {
file.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
} else {
for (News news : list) {
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
bw.write("datetime"+"="+dateStr+"|");
bw.write("url"+"="+news.getUrl()+"|");
bw.write("docno"+"="+news.getDocno()+"|");
bw.write("contenttitle"+"="+news.getContenttitle()+"|");
bw.write("content"+"="+news.getContent());
bw.write("\n");
bw.flush();
}
}
}
}
----------------------------------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------------------------------
package cn.yusys.hotnews.datasource;
/**
*xml解析時新聞實體類
*/
public class News implements Serializable{
// 實現序列化介面以便多臺機器同時解析
public News () {
}
public News(String url, String docno, String contenttitle, String content) {
super();
this.url = url;
this.docno = docno;
this.contenttitle = contenttitle;
this.content = content;
}
String url;
String docno;
String contenttitle;
String content;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getDocno() {
return docno;
}
public void setDocno(String docno) {
this.docno = docno;
}
public String getContenttitle() {
return contenttitle;
}
public void setContenttitle(String contenttitle) {
this.contenttitle = contenttitle;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
-----------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------
執行在Liunx上模擬日誌產生並通過flume採集
package cn.yusys.hotnews.datasource;
import java.io.*;
/**
* 模擬日誌伺服器產生日(從news.log/news1.log中隨機切換檔案讀取資料然後寫入日誌檔案-----》然後使用進行flume採集)
* @author Tangzhi mail:[email protected]
* @date 2018年11月12日
*/
public class ReadWebLog {
public static String readFileName;
public static String writeFileName;
public static void main (String[] args) {
readFileName = args[0];
writeFileName = args[1];
readFile(readFileName);
}
/**
* 從new.log/news1.log中隨機讀取日誌資訊
*/
public static void readFile(String fileName){
try {
FileInputStream fs = new FileInputStream(fileName);
// 轉換流
InputStreamReader isr = new InputStreamReader(fs,"utf-8");
BufferedReader br = new BufferedReader(isr);
int count = 0;
while (br.readLine() != null){
String line = br.readLine();
count ++;
// 自定義讀取間隔毫秒
Thread.sleep(1000);
System.out.println("row:" + count + ">>>>>>>>" + line);
/**
* 寫入到指定檔案中(與flume配置檔案對應)
*/
writeFile(writeFileName,line);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 檔案內容的寫入
*/
public static void writeFile (String fileName,String line) {
try {
FileOutputStream fs = new FileOutputStream(fileName, true);
OutputStreamWriter osw = new OutputStreamWriter(fs);
BufferedWriter bw = new BufferedWriter(osw);
// 執行檔案內容的寫入
bw.write(line);
bw.write("\n");
bw.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Q&A
Q1:
Java異常: "2 位元組的 UTF-8 序列的位元組 2 無效。"
A1:利用記事本開啟 另行儲存編碼格式為UTF-8 再Notepad++(其他編輯器亦可)用開啟即可
Q2 :
在Liunx系統上執行jar時出現找不到主類
A1 :使用IDEA時pom.xml加入以下依賴並在<mainClass></mainClass>部分寫入你類全路徑
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
Liunx效果圖:
附 :當資料來源檔案很大時,可以在ReadWebLog類中進行隨機讀取多個log檔案設定也可是模擬資料更為真實
搜狗實驗室:中文資訊處理以及部分資料提供 http://www.sogou.com/labs/
注:根據專案需求進行資料前期清洗
到此:資料來源獲取 簡單清洗(uid設定 時間戳繫結 部分資料過濾)已經完成
二 :Flume與HBase、Kafka資料對接
處理思路:Flume採集資料來源資料與HBase整合----->HBase與Hive對接------>Hadoop MapReduce 完成離線計算分析----->前臺Echart
Flume採集資料來源資料與Kafka整合----->Kafka與Spark Streaming對接----->Spark + MySql 完成實時計算分析----->前臺Echart
1.Flume與HBase
Tips:此時Flume Sink 下沉目標為HBase (前提:Liunx環境安裝HBase,理解HBase原理)修改hbase-site.xml 修改hbase資料目錄 zookeeper地址
HBase常用Shell命令:
啟動HBase: strat-hbase.sh
Shell命令互動模式:./hbase shell
建立表 : create '表名', '列族名1','列族名2','列族名N'.......
HBase配置檔案下圖:
##hbase-env.sh
export JAVA_HOME=/opt/jdk1.7.0_65 ----自己虛擬機器jdk路徑(etc/profile)
export JAVA_CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export HBASE_MANAGES_ZK=false ----不使用HBase自帶zookeeper使用外部zookeeper叢集
##hbase-site.xml
<configuration>
<property>
<name>hbase.master</name>
<value>192.168.25.136:60000</value>
</property>
<property>
<name>hbase.master.maxclockskew</name>
<value>180000</value>
</property>
<property>
<name>hbase.rootdir</name>
<value>hdfs://192.168.25.136:9000/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>192.168.25.136</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/tmp/hbase_data</value>
</property>
</configuration>
##regionservers
localhost ---填寫叢集IP地址或主機名
Q&A
Q1:建立HBase表時報錯: ERROR:Can not get address from Zookeeper; znode data == null
A1:1、確保zookeeper叢集可用 Hadoop叢集可用 HBase服務正常啟動
2、vi hbase-site.xml 檢視HBase資料存放目錄許可權是否為可讀可寫
Q2:HBase0.9.4 通過指令碼啟動後建立表時報錯
ERROR: org.apache.hadoop.hbase.MasterNotRunningException: Retried 7 times
A2 :
1.檢視hadoop叢集 zookeeper叢集是否可用
2.HBase 0.9.4 與 Hadoop 2.x 相容性差 版本不一致 導致無法進行RPC通訊 建議搭配:HBase 0.9.9.x + Hadoop 2.x但請注意此時
Flume最好選擇1.7.x 原因見下圖:
1.Flume與Kafka
Tips:此時Flume Sink 下沉目標為Kafka(前提:Liunx環境安裝Kafka,理解Kafka原理)
Kafka常用Shell命令:
啟動kafka: bin/kafka-servers-start ../config/server.properties (指定特定檔案啟動)
建立主題:bin/kafka-topics.sh --create --zookeeper 192.168.25.136:2181 --replication-factor 1 --partitions 1 --topic weblogs
刪除主題:bin/kafka-topics.sh --delete --zookeeper 192.168.25.136:2181 --topic weblogs
控制檯消費topic的資料:bin/kafka-console-consumer.sh --zookeeper 192.168.25.136:2181 --topic weblogs --from-beginning
控制檯生產資料:bin/kafka-console-producer.sh --broker-list 192.168.25.136:9092 --topic weblogs
檢視主題具體資訊:bin/kafka-topics.sh --zookeeper 192.168.25.136:2181 --describe --topic weblogs
Kafka配置檔案 (kafka在大資料專案中大多作為資料緩衝區 生產者-消費者模式)
#broker的全域性唯一編號,不能重複
#用來監聽連結的埠,producer或consumer將在此埠建立連線
port=9092
#處理網路請求的執行緒數量
num.network.threads=3
#用來處理磁碟IO的執行緒數量
num.io.threads=8
#傳送套接字的緩衝區大小
socket.send.buffer.bytes=102400
#接受套接字的緩衝區大小
socket.receive.buffer.bytes=102400
#請求套接字的緩衝區大小
socket.request.max.bytes=104857600
#kafka執行日誌存放的路徑
log.dirs=/export/logs/kafka
#topic在當前broker上的分片個數
num.partitions=2
#用來恢復和清理data下資料的執行緒數量
num.recovery.threads.per.data.dir=1
#segment檔案保留的最長時間,超時將被刪除
log.retention.hours=168
#滾動生成新的segment檔案的最大時間
log.roll.hours=168
#日誌檔案中每個segment的大小,預設為1G
log.segment.bytes=1073741824
#週期性檢查檔案大小的時間
log.retention.check.interval.ms=300000
#日誌清理是否開啟
log.cleaner.enable=true
#broker需要使用zookeeper儲存meta資料
zookeeper.connect=192.168.25.136:2181,192.168.25.136:2182,192.168.25.136:2183
#zookeeper連結超時時間
zookeeper.connection.timeout.ms=6000
#partion buffer中,訊息的條數達到閾值,將觸發flush到磁碟
log.flush.interval.messages=10000
#訊息buffer的時間,達到閾值,將觸發flush到磁碟
log.flush.interval.ms=3000
#刪除topic需要server.properties中設定delete.topic.enable=true否則只是標記刪除
delete.topic.enable=true
#此處的host.name為本機IP(重要),如果不改,則客戶端會丟擲:Producer connection to localhost:9092 unsuccessful 錯誤!
host.name=192.168.25.136
當kafka消費的資料與HBase的weblogs表記錄總數相等時說明已完成Flume與HBase、Kafka資料對接
Flume啟動日誌圖:
HBase count表中資料:
Kafka消費資料:
附:Flume採集資料按特定列下沉至Hbase、Kafka配置檔案(重點)
a1.sources = r1
a1.channels = kafkaC hbaseC
a1.sinks = kafkaS hbaseS
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hotnews/weblogs.log
a1.sources.r1.channels = kafkaC hbaseC
# flume + hbase
# sink 配置為HBaseSink 和 SimpleHbaseEventSerializer
a1.channels.hbaseC.type = memory
a1.channels.hbaseC.capacity = 10000
a1.channels.hbaseC.transactionCapacity = 10000
#HBase表名
a1.sinks.hbaseS.type = org.apache.flume.sink.hbase.HBaseSink
a1.sinks.hbaseS.table = weblogs
#HBase表的列族名稱
a1.sinks.hbaseS.columnFamily = info
a1.sinks.hbaseS.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
#正則匹配新聞資料去到對應的列族下的對應列(xxxx|xxxx|xxxx|xxxx|xxxx)
a1.sinks.hbaseS.serializer.regex = ^(.*)\\|(.*)\\|(.*)\\|(.*)\\|(.*)
a1.sinks.hbaseS.serializer.colNames =datatime,url,docno,contenttitle,content
#a1.sinks.hbaseS.serializer.rowKeyIndex = 0
# 組合sink和channel
a1.sinks.hbaseS.channel = hbaseC
# flume + kafka
a1.channels.kafkaC.type = memory
a1.channels.kafkaC.capacity = 10000
a1.channels.kafkaC.transactionCapacity = 10000
a1.sinks.kafkaS.channel = kafkaC
a1.sinks.kafkaS.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.kafkaS.topic = weblogs
a1.sinks.kafkaS.brokerList = 192.168.25.136:9092
a1.sinks.kafkaS.zookeeperConnect = 192.168.25.136:2181
a1.sinks.kafkaS.requiredAcks = 1
a1.sinks.kafkaS.batchSize = 20
a1.sinks.kafkaS.serializer.class = kafka.serializer.StringEncoder
三 :Kafka+Spark Streaming +MySql 實時計算分析
1、資料庫連線池編寫(Java原生版 + Scala c3p0版)
附:MySql部署在Liunx,連線資訊配置在db.properties
Java原生版:
package cn.yuysy.hotnews.realtime.db;
import java.io.File;
import java.io.FileInputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.Properties;
/**
* 資料庫連線池
* Created on 2018-11-15
* @author @author tangzhi mail:[email protected]
*/
public class ConnectionPool {
private static LinkedList<Connection> connectionQueue;
private static Properties prop ;
/**
* 驅動類
*/
static {
try {
prop = new Properties();
prop.load(new FileInputStream(new File("C:\\Users\\Administrator\\Hot_News\\src\\main\\scala\\cn\\yuysy\\hotnews\\realtime\\db\\db.properties")));
Class.forName(prop.getProperty("driverName").toString());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 獲取連線物件
*/
public synchronized Connection getConnection () {
if (connectionQueue == null || connectionQueue.size() == 0) {
connectionQueue = new LinkedList<Connection>();
for (int i = 0;i < 5;i ++) {
try {
Connection connection = DriverManager.getConnection(prop.getProperty("url").toString(), prop.getProperty("username").toString(), prop.getProperty("password").toString());
connectionQueue.add(connection);
} catch (SQLException e) {
e.printStackTrace();
}
}
}
return connectionQueue.poll();
}
/**
* 歸還連線至連線池
*/
public void returnConnection(Connection connection) {
connectionQueue.add(connection);
}
}
Scala c3p0版:
package cn.yuysy.hotnews.realtime.db
import java.io.{File, FileInputStream, InputStream}
import java.sql.Connection
import java.util.Properties
import com.mchange.v2.c3p0.ComboPooledDataSource
import org.apache.spark.SparkFiles
/**
* C3P0資料庫連線池
* Created on 2018-11-15
* @author tangzhi mail:[email protected]
*/
class c3p0ConnectionPool(isLocal: Boolean) extends Serializable {
private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true)
private val prop = new Properties()
private var in: InputStream = _
isLocal match {
case true => in = getClass.getResourceAsStream("db.properties") //本地IDEA模式
case false => in = new FileInputStream(new File(SparkFiles.get("db.properties"))) //Liunx spark叢集模式
}
/**
* 註冊連線
* @return
*/
try {
prop.load(in);
cpds.setJdbcUrl(prop.getProperty("url").toString())
cpds.setDriverClass(prop.getProperty("driverName").toString())
cpds.setUser(prop.getProperty("username").toString())
cpds.setPassword(prop.getProperty("password").toString())
cpds.setMaxPoolSize(Integer.valueOf(prop.getProperty("maxPoolSize").toString()));
cpds.setMinPoolSize(Integer.valueOf(prop.getProperty("minPoolSize").toString()));
cpds.setAcquireIncrement(Integer.valueOf(prop.getProperty("acquireIncrement").toString()))
} catch {
case ex: Exception => ex.printStackTrace()
}
def getConnection: Connection={
try {
cpds.getConnection()
} catch {
case ex: Exception => ex.printStackTrace()
null
}
}
object c3p0ConnectionPool{
var connectionPool: c3p0ConnectionPool = _
def getc3p0ConnectionPool(isLocal: Boolean): c3p0ConnectionPool = {
synchronized {
if (connectionPool == null) {
connectionPool = new c3p0ConnectionPool(isLocal)
}
}
connectionPool
}
}
}
Q&A
Q1:本地執行spark streaming 程式讀取kafka資料報錯:
Exception in thread "main" java.lang.AssertionError: assertion failed: No output streams registered, so nothing to execute at scala
A1:spark streaming 運算元執行沒有觸發Action 以下為常見action
Q2:spark streamign 實時計算處理後的資料寫入MySqL亂碼
A2:在資料庫連線配置檔案的URL後新增: ?useUnicode=true&characterEncoding=utf8即可
2、實時分析思路 + 部分程式碼
實時分析思路:
從kafka讀取資料後(_._2)----->新聞資料------>先將value對映為Map[String,String]----->切割、根據key分組、聚合----->根據key值寫sql------>寫入MySql成功
部分程式碼:
package cn.yuysy.hotnews.realtime
import java.io.{File, FileInputStream}
import java.sql.{Connection, Statement}
import java.util.Properties
import cn.yuysy.hotnews.realtime.db.c3p0ConnectionPool
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.storage.