1. 程式人生 > >Kafka在大資料環境中的應用

Kafka在大資料環境中的應用

我們生活在一個數據爆炸的時代,資料的巨量增長給我們的業務處理帶來了壓力,同時巨量的資料也給我們帶來了十分可觀的財富。隨著大資料將各個行業使用者、運營商、服務商的資料整合進大資料環境,或使用者取用大資料環境中海量的資料,業務平臺間的訊息處理將變得尤為複雜。如何高效地採集、使用資料,如何減輕各業務系統的壓力,也變得越來越突出。在早期的系統實現時,業務比較簡單。即便是資料量、業務量比較大,大資料環境也能做出處理。但是隨著接入的系統增多,資料量、業務量增大,大資料環境、業務系統都可出現一定的瓶頸。下面我們看幾個場景。

場景一:我們開發過一個裝置資訊挖掘平臺。這個平臺需要實時將採集網際網路關採集到的路由節點的狀態資訊存入資料中心。通常一個閘道器一次需要上報幾十甚至幾百個變化的路由資訊。全區有幾萬個這種網際網路關。當資訊採集平臺將這些變化的資料資訊寫入或更新到資料庫時候,會給資料庫代理非常大的壓力,甚至可以直接將資料庫搞掛掉。這就對我們的資料採集系統提出了很高的要求。如何穩定高效地把訊息更新到資料庫這一要求擺了出來。

場景二:資料中心處理過的資料需要實時共享給幾個不同的機構。我們常採用的方法是將資料批量存放在資料採集機,分支機構定時來採集;或是分支機構通過JDBC、RPC、http或其他機制實時從資料中心獲取資料。這兩種方式都存在一定的問題,前者在於實時性不足,還牽涉到資料完整性問題;後者在於,當資料量很大的時候,多個分支機構同時讀取資料,會對資料中心的造成很大的壓力,也造成很大的資源浪費。

為了解決以上場景提出的問題,我們需要這樣一個訊息系統:

緩衝能力,系統可以提供一個緩衝區,當有大量資料來臨時,系統可以將資料可靠的緩衝起來,供後續模組處理;

訂閱、分發能力,系統可以接收訊息可靠的快取下來,也可以將可靠快取的資料釋出給使用者。

這就要我們找一個高吞吐的、能滿足訂閱釋出需求的系統。

Kafka是一個分散式的、高吞吐的、基於釋出/訂閱的訊息系統。利用kafka技術可以在廉價PC Server上搭建起大規模的訊息系統。Kafka具有訊息持久化、高吞吐、分散式、實時、低耦合、多客戶端支援、資料可靠等諸多特點,適合線上和離線的訊息處理。

使用kafka解決我們上述提到的問題。

網際網路關採集到變化的路由資訊,通過kafka的producer將歸集後的資訊批量傳入kafka。Kafka按照接收順序對歸集的資訊進行快取,並加入待消費佇列。Kafka的consumer讀取佇列資訊,並一定的處理策略,將獲取的資訊更新到資料庫。完成資料到資料中心的儲存。

資料中心的資料需要共享時,kafka的producer先從資料中心讀取資料,然後傳入kafka快取並加入待消費佇列。各分支結構作為資料消費者,啟動消費動作,從kafka佇列讀取資料,並對獲取的資料進行處理。

Kafka生產的程式碼如下:

public void produce(){

               //生產訊息預處理

    produceInfoProcess();       

    pro.send(ProducerRecord,new Callback(){

                        @Override

                        onCompletion() {

                                 if (metadata == null) {

                                           // 傳送失敗

                                           failedSend();

                                 } else {

                                           //傳送成功!" 

                                           successedSend();     

}

                        }                      

          });  

 }

訊息生產者根據需求,靈活定義produceInfoProcess()方法,對相關資料進行處理。並依據資料釋出到kafka的情況,處理回撥機制。在資料傳送失敗時,定義failedSend()方法;當資料傳送成功時,定義successedSend()方法。

Kafka消費的程式碼如下:

public void consumer() {

                   //配置檔案

        properties();

        //獲取當前資料的迭代器

        iterator = stream.iterator();

        while (iterator.hasNext()) {

            //取出訊息

            MessageAndMetadata<byte[], byte[]> next = iterator.next();

            messageProcess();

             }      

}

Kafka消費者會和kafka叢集建立一個連線。從kafka讀取資料,呼叫messageProcess()方法,對獲取的資料靈活處理。

結論

Kafka的高吞吐能力、快取機制能有效的解決高峰流量衝擊問題。實踐表明,在未將kafka引入系統前,當網際網路關傳送的資料量較大時,往往會掛起關係資料庫,資料常常丟失。在引入kafka後,更新程式能夠結合能力自主處理訊息,不會引起資料丟失,關係型資料庫的壓力波動不會發生過於顯著的變化,不會出現資料庫掛起鎖死現象。

依靠kafka的訂閱分發機制,實現了一次釋出,各分支依據需求自主訂閱的功能。避免了各分支機構直接向資料中心請求資料,或者資料中心依次批量向分支機構傳輸資料以致實時性不足的情況。kafka提高了實時性,減輕了資料中心的壓力,提高了效率。

感謝您的觀看,如有不足之處,歡迎批評指正。

為了幫助大家讓學習變得輕鬆、高效,給大家免費分享一大批資料,幫助大家在成為大資料工程師,乃至架構師的路上披荊斬棘。在這裡給大家推薦一個大資料學習交流圈:658558542 歡迎大家進群交流討論,學習交流,共同進步。

當真正開始學習的時候難免不知道從哪入手,導致效率低下影響繼續學習的信心。

但最重要的是不知道哪些技術需要重點掌握,學習時頻繁踩坑,最終浪費大量時間,所以有有效資源還是很有必要的。

最後祝福所有遇到瓶疾且不知道怎麼辦的大資料程式設計師們,祝福大家在往後的工作與面試中一切順利。