1. 程式人生 > >大資料實戰-電信客服-重點記錄

大資料實戰-電信客服-重點記錄

# 寫在前面的話 最近不是一直在學習大資料框架和引用嘛(我是按照[尚矽谷](http://www.atguigu.com/)[B站](https://space.bilibili.com/302417610?spm_id_from=333.788.b_765f7570696e666f.1)視訊先學習過一遍路線,以後找準方向研究),除了自己手動利用Kafka和HDFS寫一個簡單的分散式檔案傳輸(分散式課程開放性實驗,剛好用上了所學的來練練手)以外,還學習這個學習路線一個專案,[電信客服實戰](https://www.bilibili.com/video/BV17t411W7wZ?p=1)。在這個專案裡面還是學習到了不少內容,包括Java上不足的很多地方、Java工程開發上的要求和少數框架的複習。~~不排除自己太菜,啥都不知道,認為一些常見的東西不常見的情況(哭~~ 鑑於網上有很多類似的內容,這裡我只是將我學習和復code的過程中,學習到的知識和遇到問題的解決方案寫下,以作記錄和回顧。 [1.我的手敲程式碼](https://github.com/ginkgo-code/Telecom-customer-service/tree/master),[2.老師原始碼,含資料和筆記(提取碼:pfbv)](https://pan.baidu.com/s/1kF933V8oN2MFhI0HT5GLKQ)  宣告一下,沒有任何廣告意思,這種渠道是很容易找到並且也很多的,我只是恰好學習了這個,並且**我覺得**還不錯~~(求生欲極強~~ # 電信客服 ## 介紹 專案主要是模擬電信中的資訊部門,從生產環境中獲取通話資訊資料,根據業務需求儲存和分析資料。 ### 業務需求 統計每天、每月以及每年的每個人的通話次數及時長。 ### 專案架構
### 程式碼流程 編寫程式碼的流程分為四步:①資料生產,②資料消費,③資料分析,④資料展示。我在學習過程中,沒有學習資料展示部分。 ## 資料生產 主要任務是利用contact.log(聯絡人檔案)的資料,生成不同聯絡人之間通話記錄的流程。 這個Part,老師有句話我覺得很在理,“大資料開發人員雖然不管資料怎麼來的,怎麼出去的,但是必須知道和了解這個過程才能按照需求code”。~~腦海中閃過中介軟體~~ ### 面向介面程式設計 - 專案第一步 在以前的程式設計學習過程中,總是一股腦兒的猛寫程式碼,雖然我自認為我在我們宿舍已經是模組化思想最為**嚴重**的了,但是從未接觸到面向介面程式設計。這學期也學習了軟體工程,(雖然我們學得很水),這門課雖然不是在教我們寫程式碼,但卻是教我們如何正確的做專案和寫程式碼(暈。 面向介面程式設計也是如此,在這個專案中,瞭解了我們的資料來源和需求後,第一步要做的是弄清楚需要的物件和需要的功能,即介面,在共同的模組中確定好介面和介面的方法簽名,接下來才是對介面模組的實現和實現業務。 在這個專案中,建立了一個ct-common模組作為公共模組,簡單介紹幾個: | 介面或抽象類 | 描述 | | :----------: | :----------------------------------------------------------: | | Val | 一般資料都需要的實現的介面,只包括名稱意義上的獲取值value()方法 | | DataIn | 資料輸入介面,功能有設定輸入路徑,讀取資料,故存在setPath()和read()方法 | | Producer | 資料生產者介面,功能有獲取輸入資訊,設定生產輸出和生產,故存在setIn()和和setOut()和produce()方法 | 下圖是ct-common的程式碼結構:
### 封裝物件 - 提高擴充套件性 這個思想其實我在之前的編碼過程中就有點領悟了,之所以在這裡提出,是因為在這個跟進過程中,更加體會到Java編碼就是各種物件組合呼叫的含義。~~或許是老師專案拉得太快,讓我感覺自己太菜,skrskr~~ 我之前編碼過程中,也會不停的封裝物件,但一般都是那些很明顯的功能整合物件,更別說是對資料進行封裝成資料整合物件了。換句話說,就是我之前封裝的物件都是含有一定動作的(除了getter&setter)。但是對於一些物件之間傳遞的資料,如果每次都傳相同的資料並且數量>1的話,最好的是封裝成物件,提高擴充套件性。在業務需要增添一個數據傳遞的情況下,封裝資料物件只需要更改物件的屬性和物件的構成,否則每個傳遞的地方(語句)都需要增添傳遞的資料變數。
下面用一張圖表示,在該專案中封裝Calllog和Contact物件的效果: 如果不封裝物件,如果聯絡人物件裡面在加入一個new item(比如性別),那麼幾乎所有的地方都需要修改;反之,只需要在Contact類中增添new item屬性和在Calllog中增添A.new&B.new屬性,以及修改構造方法就可以了,同時在Producer過程中,沒有增添和修改過多程式碼。
### 資料生產總結 在這個Part中主要還是熟悉任務就可以完成,沒遇到什麼問題。如果不用上述的tricks那這不就是一個讀入檔案和寫入檔案的程式碼嘛~~(我一main方法就能搞定)~~,但是用了之後感覺就明顯不同,更加工程化,邏輯感更強。 ## 資料消費 主要操作是利用Flume和Kafka將收集不斷生產的資料,並且將資料插入到HBase中。 ### 新概念 主要是學到了一些新的知識,還有知識的簡單運用,我並沒有深究這些新概念~~(估計得學到頭禿)~~。 - [類載入器](https://www.cnblogs.com/dongguacai/p/5879931.html):類載入器是負責將可能是網路上、也可能是磁碟上的class檔案載入到記憶體中。併為其生成對應的java.lang.class物件。 有**三種類載入器**,分別按照順序是啟動類載入器BootstrapClassLoader、擴充套件類載入器Extension ClassLoader和系統類載入器App ClassLoader。還存在一種**雙親委派模型**,簡單的意思就是說當一個類載入器收到載入請求時,首先會向上層(父)類載入器發出載入請求。並且每一個類載入器都是如此,所以每個類載入器的請求都會被傳遞到最頂層的類載入器中,一開始我覺得很麻煩,不過這確實可以避免類的重複載入。 在電信客服的專案中,類載入器被用於載入resource資料夾的配置檔案。 ```java Properties prop = new Properties(); // 利用類載入器獲取配置檔案 prop.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("consumer.properties")); ``` - [ThreadLocal](https://www.jianshu.com/p/3c5d7f09dfbd):這是一個執行緒內維護的儲存變數陣列。舉個簡單的比方,在Java執行的時候有多個執行緒,存在一個Map,K就是每個執行緒的Id,V則是每個執行緒記憶體儲的資料變數。 這是多執行緒相同的變數的訪問衝突問題解決方法之一,是通過給每個執行緒單獨一份儲存空間(犧牲空間)來解決訪問衝突;而熟悉的Synchronized通過等待(犧牲時間)來解決訪問衝突。同時ThreadLocal還具有執行緒隔離的作用,即A執行緒不能訪問B執行緒的V。 在電信客服的專案中,ThreadLocal被用來持久化Connection和Admin連線。因為在HBase的DDL和DML操作中,不同的操作都需要用到連線,所以將其和該執行緒進行繫結,加快獲取的連線的速度和減少記憶體佔用。~~當然也可以直接new 幾個物件,最後統一關閉。~~ ```java // 通過ThreadLocal保證同一個執行緒中可以不重複建立連線和Admin。 private ThreadLocal connHolder = new ThreadLocal(); private ThreadLocal adminHolder = new ThreadLocal(); private Connection getConnection() throws IOException { Connection conn = connHolder.get(); if (conn == null) { Configuration conf = HBaseConfiguration.create(); conn = ConnectionFactory.createConnection(conf); connHolder.set(conn); } return conn; } private Admin getAdmin() throws IOException { Admin admin = adminHolder.get(); if (admin == null) { getConnection(); admin = connHolder.get().getAdmin(); adminHolder.set(admin); } return admin; } ``` ### 分割槽鍵和RowKey的設計 分割槽鍵的設計一般是機器數量。rowKey的設計基於表的分割槽數,並且滿足**長度原則**(10~100KB即可,最好是8的倍數)、**唯一性原則**和**雜湊性原則**(負載均衡,防止出現數據熱點) #### 分割槽鍵 本專案中共6個分割槽,故分割槽號為"0|"、"1|"、"2|"、"3|"、"4|"。舉一個例子,3\*\*\*\*的第二位無論是任何數字都會小於"|"(第二大的字元),所以"2|"<"3\*\*\*\*"<"3|",故分到第四個分割槽。 #### RowKey 設計好了分割槽鍵後,rowKey的設計主要是根據業務需求哪些資料需要聚集在一起方便查詢,那就利用那些資料設計資料的分割槽號。 資料含有主叫使用者(13312341234)、被叫使用者(14443214321)、通話日期(20181010)和通話時長(0123)。業務要求我們將經常需要統計一個使用者在某一月內的通話記錄,即主叫使用者和通話日期中的年月是關鍵資料。根據這些資料計算分割槽號,保證同一使用者在同一月的通話記錄在HBase上是緊鄰的(還有一個前提要求是rowkey還必須是分,分割槽號+主叫使用者+通話日期+others,否則在一個分割槽上還是有可能是亂的)。下面是計算分割槽號的程式碼: ```java /** * 計算得到一條資料的分割槽編號 * * @param tel 資料的主叫電話 * @param date 資料的通話日期 * @return regionNum 分割槽編號 */ protected int genRegionNum(String tel, String date) { // 獲取電話號碼的隨機部分 String userCode = tel.substring(tel.length() - 4); // 獲取年月 String yearMonth = date.substring(0, 6); // 雜湊 int userCodeHash = userCode.hashCode(); int yearMonthHash = yearMonth.hashCode(); // crc 迴圈冗餘校驗 int crc = Math.abs(userCodeHash ^ yearMonthHash); // 取餘,保證分割槽號在分割槽鍵範圍內 int regionNum = crc & ValueConstants.REGION_NUMS; return regionNum; } ``` #### 查詢方法 例子:查詢13312341234使用者在201810的通話記錄 startKey <- genRegionNum("13312341234","201810")+"\_"+"13312341234"+"\_"+"201810" endKey <- genRegionNum("13312341234","201810")+"\_"+"13312341234"+"\_"+"201810"+"|" ### 協處理器 #### 引入的原因 電信客服中通常需要計算兩個客戶之間親密度,計算的資料來源於兩者的通話記錄。舉個例子,計算A和B的親密度,那要A和B之間的通話記錄,特別注意的是不僅需要A call B的記錄,還需要B call A的記錄。 - 第一,最無腦的方法是啥也不做~~(憨憨~~,在查詢的時候通過scan中的filter對rowkey進行過濾查詢,這樣子每需要查詢全表,速度過慢。 - 第二,最直觀的方法是接收到Kafka的一條資料後,插入兩條資料,主叫使用者和被叫使用者換個位置第二次插入HBase時加上一個標誌位Flag,標識第一個電話號碼(HBase中的列稱為call1)是否是主叫使用者。 - 第三,顯然一條資料是重複了兩次,那麼在查詢的時候(無關親密度)出現兩次,即影響查詢速度。所以優化的方法重複的資料單獨新建一個列族,在查詢的時候只需要在一個列族中查詢。即減少了資料量,畢竟HBase針對表的存一個個store進行儲存的。 - 第四,這樣子擴充套件性太低,要是需要重複幾十次,那編碼效率和插入效率也太低了,故在HBase中引入了協處理相當於MySQL中的**觸發器**,協處理器部署在RegionServer上。 #### 協處理器的設計 就好比MySQL中的觸發器一樣,MySQL的觸發器有針對update、insert和delete的,還有before和after等等,協器也有類似的對應函式。比如,在本專案中,需要的是再插入一條資料後,協處理器被觸發插入另外一條“重複資料”,所以複寫的方法是postPut。 設計具體邏輯是:根據插入的Put獲得插入的資料資訊,然後判斷插入的標誌位Flag是不是1,如果是1,則插入另外一條重複資料。
下面是程式碼: ```java public class InsertCalleeCoprocessor extends BaseRegionObserver { /** * 這是HBase上的協處理器方法,在一次Put之後接下來的動作 * * @param e * @param put * @param edit * @param durability * @throws IOException */ @Override public void postPut(ObserverContext e, Put put, WALEdit edit, Durability durability) throws IOException { // 1. 獲取表物件 Table table = e.getEnvironment().getTable(TableName.valueOf(Names.TABLE.getValue())); // 2. 構造Put // 在rowKey中存在很多資料資訊,這一點就不具備普適性 String values = Bytes.toString(put.getRow()); String[] split = values.split("_"); String call1 = split[1]; String call2 = split[2]; String callTime = split[3]; String duration = split[4]; String flag = split[5]; // 在協處理器中也發生了Put操作,但是此時的Put不引發協處理器再次響應 // 必須得關閉表連線 if ("0".equals(flag)) { table.close(); return; } CoprocessorDao dao = new CoprocessorDao(); String rowKey = dao.genRegionNums(call2, callTime) + "_" + call2 + "_" + call1 + "_" + callTime + "_" + duration + "_" + "0"; Put calleePut = new Put(Bytes.toBytes(rowKey)); calleePut.addColumn(Bytes.toBytes(Names.CF_CALLEE.getValue()), Bytes.toBytes("call1"), Bytes.toBytes(call2)); calleePut.addColumn(Bytes.toBytes(Names.CF_CALLEE.getValue()), Bytes.toBytes("call2"), Bytes.toBytes(call1)); calleePut.addColumn(Bytes.toBytes(Names.CF_CALLEE.getValue()), Bytes.toBytes("callTime"), Bytes.toBytes(callTime)); calleePut.addColumn(Bytes.toBytes(Names.CF_CALLEE.getValue()), Bytes.toBytes("duration"), Bytes.toBytes(duration)); calleePut.addColumn(Bytes.toBytes(Names.CF_CALLEE.getValue()), Bytes.toBytes("flag"), Bytes.toBytes("0")); // 3. 插入Put table.put(calleePut); // 4. 關閉資源,否則記憶體會溢位 table.close(); } private class CoprocessorDao extends BaseDao { public int genRegionNums(String tel, String date) { return super.genRegionNum(tel, date); } } } ``` 需要注意的問題: 1. 編完協處理器程式碼後需要修改建立表的資料,在新增的表描述器上新增編寫的協處理器類全路徑,並且將打包發給叢集,記住分發。 2. 判斷標誌位是否為1,為1才被觸發,因為協處理器觸發傳送的“重複”資料也會被協處理器自身感應到。 3. 在協處理器上面插入資料後,要關閉表的連線,否則記憶體會溢位。 ### 遇到的問題和解決方案 - 在老師給的程式碼,在執行過程中,發現slave1和slave2中RegionServer掛掉了,然後我手動啟動並且檢視HBase中的資料,觀察到資料存在並且無誤,然後在master:16010上觀察所有的分割槽都在master的RegionServer上,正常。 但是為什麼會我的掛掉呢,明明虛擬機器的配置是一樣的,果斷檢視日誌發現out of memory,記憶體溢位了,心念一轉怕不是程式碼有問題。果不其然,在程式碼中,先是開啟的table連線,然後進行標誌位的判斷,如果為1傳送資料後關閉連線,但是在標誌位為0的時候沒有關閉連線,所以記憶體才會溢位,修改完事兒! - 這裡就是笨逼(沒錯,就是我)犯下的錯誤,我在修改完後打包上傳......怎麼出錯了,我幾乎整了半天才發現我居然沒分發!!!分發後就可以看到較好的效果。這就完了?我講講最後我是怎麼發現沒分發,沒分發的過程中slave1和slave2的RegionServer總是掛掉,並且還是記憶體出錯(所以我才懵,當時我覺得是我的機子不行,~~換機子~~,所以直接kill slave1和slave2的RegionServer在開始執行,得到了和之前相同的結果,但是記憶體的問題我應該是解決了的,所以那隻可能是程式碼的問題了。下載下來一看,不一樣,我懂了,Nicer,這就賞自己兩嘴巴子!(哭 當然,這也是一次記憶深刻的debug!!! ### 總結 這個流程是我學習最多的流程,除了複習這個大資料框架的API,更多的是對我的Java有了更多的拓展。除了上述提到的,還有一些註解,泛型和泛型的PECS原則等等。另外就是學習怎麼一步步排除錯誤和尋找自己的(低階)錯誤的方法了,這種DeBug的方式對於我來說很新鮮。 ## 資料分析 同時利用redis快取資料,利用MapReduce將HBase中的資料提取到MySQL中。 ### DeBug分析 **出現的問題:**MapReduce任務執行成功,但是MySQL中未插入資料,同時檢視MapReduce8088埠,看不到日誌,顯示no log for container available。 **問題分析:** 1.觀察MapReduce的任務,發現Reduce的確是正確輸出了位元組,但是MySQL沒有插入資料,那隻可能是編寫的OutputFormat出現了問題。 2.no log for container available, 在網上查閱資料提示有可能是記憶體不足的問題。 3.檢視MapReduce的Reduce任務,發現是在nodemanager是在slave1上執行,而slave1只分配了2G記憶體。 4.kill slave1和slave2的nodemanager,只執行master的nodemanager,因為master我分配了4G記憶體。 5.檢視日誌成功,尋找錯誤。 6.發現是MySQL語句出現了語法錯誤????????~~(離譜,就**離譜)~~ 7.修改MySQL語句,任務成功執行。 # 總結 這是我的一個學習上手的大資料專案,雖然簡單但是也學習不少。做這個專案的時候是考試周,也算是忙裡偷閒完成了!主要是這個專案和我們小隊準備參加的服創大賽的專案很類似,也算是提前練練手,熟悉下基本的流程。不過我們小隊的專案最好還是得上Spark和好的機器~~(虛擬機器老拉跨~~,所以繼續學習!!! ### 人生此處,絕