1. 程式人生 > >sparkstreaming和kafka整合的兩種方式

sparkstreaming和kafka整合的兩種方式

-1,基於接收者Receiver-based的方法

運算元:KafkaUtils.createStream 
方法:PUSH,從topic中去推送資料,將資料推送過來 
API:呼叫的Kafka高階API 
效果:SparkStreaming中的Receivers,恰好Kafka有釋出/訂閱 ,然而:此種方式企業不常用,說明有BUG,不符合企業需求。因為:接收到的資料儲存在Executor的記憶體,會出現資料漏處理或者多處理狀況 
解釋:這種方法使用Receiver來接收資料。Receiver是使用Kafka高階消費者API實現的。與所有的接收者一樣,通過Receiver從Kafka接收的資料儲存在Spark執行程式exector中,然後由Spark Streaming啟動的作業處理資料。但是,在預設配置下,這種方法可能會在失敗時丟失資料。為了確保零資料丟失,您必須在Spark Streaming(在Spark 1.2中引入)中額外啟用寫入日誌,同時儲存所有接收到的Kafka資料寫入分散式檔案系統(例如HDFS)的預先寫入日誌,以便所有資料都可以在失敗時恢復。 
缺點: 

  • 配置spark.streaming.receiver.writeAheadLog.enable引數,每次處理之前需要將該batch內的日誌備份到checkpoint目錄中,這降低了資料處理效率,反過來又加重了Receiver端的壓力;另外由於資料備份機制,會受到負載影響,負載一高就會出現延遲的風險,導致應用崩潰。
  • 採用MEMORY_AND_DISK_SER降低對記憶體的要求。但是在一定程度上影響計算的速度
  • 單Receiver記憶體。由於receiver也是屬於Executor的一部分,那麼為了提高吞吐量,提高Receiver的記憶體。但是在每次batch計算中,參與計算的batch並不會使用到這麼多的記憶體,導致資源嚴重浪費。
  • 提高並行度,採用多個Receiver來儲存Kafka的資料。Receiver讀取資料是非同步的,並不參與計算。如果開較高的並行度來平衡吞吐量很不划算。
  • Receiver和計算的Executor的非同步的,那麼遇到網路等因素原因,導致計算出現延遲,計算佇列一直在增加,而Receiver則在一直接收資料,這非常容易導致程式崩潰。
  • 在程式失敗恢復時,有可能出現數據部分落地,但是程式失敗,未更新offsets的情況,這導致資料重複消費。

為了回闢以上問題,降低資源使用,我們後來採用Direct Approach來讀取Kafka的資料,具體接下來細說。

-2,直接方法(無接收者)

運算元:KafkaUtils.createDirectStream 
方式:PULL,到topic中去拉取資料。 
API:kafka低階API 

Direct方式採用Kafka簡單的consumer api方式來讀取資料,無需經由ZooKeeper,此種方式不再需要專門Receiver來持續不斷讀取資料。當batch任務觸發時,由Executor讀取資料,並參與到其他Executor的資料計算過程中去。driver來決定讀取多少offsets,並將offsets交由checkpoints來維護。Direct方式無需Receiver讀取資料,而是需要計算時再讀取資料,所以Direct方式的資料消費對記憶體的要求不高,只需要考慮批量計算所需要的記憶體即可。
效果:每次到Topic的每個分割槽依據偏移量進行獲取資料,拉取資料以後進行處理,可以實現高可用 
解釋:在Spark 1.3中引入了這種新的無接收器“直接”方法,以確保更強大的端到端保證。這種方法不是使用接收器來接收資料,而是定期查詢Kafka在每個topic+分partition中的最新偏移量,並相應地定義要在每個批次中處理的偏移量範圍。當處理資料的作業啟動時,Kafka簡單的客戶API用於讀取Kafka中定義的偏移範圍(類似於從檔案系統讀取檔案)。請注意,此功能在Spark 1.3中為Scala和Java API引入,在Spark 1.4中針對Python API引入。 
優勢: 
①、簡化的並行性:不需要建立多個輸入Kafka流並將其合併。與此同時directStream,Spark Streaming將建立與使用Kafka分割槽一樣多的RDD分割槽,這些分割槽將全部從Kafka並行讀取資料。所以在Kafka和RDD分割槽之間有一對一的對映關係,這更容易理解和調整。

②、效率:在第一種方法中實現零資料丟失需要將資料儲存在預寫日誌中,這會進一步複製資料。這實際上是效率低下的,因為資料被有效地複製了兩次,一次是由Kafka,另一次是由預先寫入日誌(Write Ahead Log)複製。此方法消除了這個問題,因為沒有接收器,因此不需要預先寫入日誌。只要你有足夠的kafka保留,訊息可以從kafka恢復。

③、精確語義:第一種方法是使用Kafka的高階API在Zookeeper中儲存消耗的偏移量。傳統上這是從Kafka消費資料的方式。雖然這種方法(合併日誌)可以確保零資料丟失,但在某些失敗情況下,很小的機率兩次資料都同時丟失,發生這種情況是因為Spark Streaming可靠接收到的資料與Zookeeper跟蹤的偏移之間的不一致。因此,在第二種方法中,我們使用不使用Zookeeper的簡單Kafka API。在其檢查點checkpoint內,Spark Streaming跟蹤偏移量。這消除了Spark Streaming和Zookeeper / Kafka之間的不一致性,因此Spark Streaming每次記錄都會在發生故障時有效地接收一次。

請注意,這種方法的一個缺點是它不會更新Zookeeper中的偏移量,因此基於Zookeeper的Kafka監控工具將不會顯示進度。但是,您可以在每個批次中訪問由此方法處理的偏移量,並自己更新Zookeeper

優缺點對比其實很明顯:

直接讀取方式沒有專門的receivers,降低記憶體使用,生產中能將記憶體從10G降低到2-4G

直接讀取的qud, offset的維護需要手動開發,不能通過監控zookeeper來監控offset進度

相關推薦

sparkStreaming讀取kafka方式

omap consumer 屬於 output 滿足 asi keep require tag 概述 Spark Streaming 支持多種實時輸入源數據的讀取,其中包括Kafka、flume、socket流等等。除了Kafka以外的實時輸入源,由於我們的業務場景沒有涉及

sparkstreamingkafka整合方式

-1,基於接收者Receiver-based的方法運算元:KafkaUtils.createStream 方法:PUSH,從topic中去推送資料,將資料推送過來 API:呼叫的Kafka高階API 效果:SparkStreaming中的Receivers,恰好Kafka有釋出/訂閱 ,然而:此種方式企業不常

sparkstreamingkafka整合方式(最全)

-1,基於接收者的方法 運算元:KafkaUtils.createStream 方法:PUSH,從topic中去推送資料,將資料推送過來 API:呼叫的Kafka高階API 效果:SparkStreaming中的Receivers,恰好Kafka有釋出/

P5.JS 手繪碼繪方式“運動”主題作品的對比研究

這次作業我針對的主題是“斐波那契螺旋線” 背景知識:斐波那契螺旋線,也稱“黃金螺旋”,是根據斐波那契數列畫出來的螺旋曲線,自然界中存在許多斐波那契螺旋線的圖案,是自然界最完美的經典黃金比例。作圖規則是在以斐波那契數為邊的正方形拼成的長方形中畫一個90度的扇形,連起來的弧線就是斐波那契螺旋線。在這

fastDFS+LibreOffice多檔案上傳(二)後端部分:檔案資訊轉json字串儲存資料庫(Gsonorg.json方式)

需要注意的地方: 1)如果你複製我的程式碼到你的程式上報錯,可以看看我第一篇文章實體類跟配置檔案的設定:https://blog.csdn.net/qq_36688143/article/details/84162924 第二篇檔案上傳前端頁面的程式碼: https://blog.c

智聯招聘抓取---scrapy框架requests庫方式實現

#首先分析目標站點,分析得出結果是在json接口裡,然後抓取企業資訊需要再次請求頁面進行抓取 #1.直接requests請求進行抓取儲存 ##需要注意點: 可能不同企業單頁排版不一樣,需要判斷採取不同形式 儲存為csv檔案注意格式,保證資料表格不換行需要新增

獲取後臺資料使用JQ-AJAX Vue-Axios 方式的使用對比

<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content=

遞迴迭代方式實現歸併排序(Java版)

遞迴版 package MergeSort; import Utils.SortUtils; /** * 歸併排序遞迴版 * @author liguodong */ pub

基於陣列連結串列方式實現棧

棧是一種先進後出的資料結構,在實際程式設計棧有很廣泛的用處,Java棧已經幫我們實現好了stack類。 實現棧的兩種方式,基於陣列實現和基於連結串列實現。 1.stack介面 public interface StackADT { //入棧操作 public voi

手繪碼繪方式創作“運動”主題

藝術的展現形式有多種多樣,繪畫的方式也有很多。這篇文章主要來比較一下手繪與碼繪。 什麼是碼繪?用程式語言即程式碼來畫畫。一般人更多接觸到的是手繪,只有從事或研究相關方面的才會比較瞭解碼繪。聽說現在的藝術生都要學點程式設計呢。 上面兩張就是用processing作的

虛擬機器下 solr7.1 cloud 叢集搭建 (手動解壓官方指令碼方式

準備工作:   vmware workstation 12,OS使用的是ubuntu16.04,三臺虛擬機器搭建一個solr叢集,zookeeper共用這三臺虛擬機器組成zookeeper叢集。   zookeeper的版本為3.4.10,solr版本為7.1,不使用

SpringMvc 上傳excel(註解非註解方式

1、第一種方式: A:JSP頁面: <form name="importForm" action="${ctx }/service/userService/BatchImport.do" method="post" enctype="multipar

用陣列連結串列方式實現佇列

手寫陣列實現佇列 1 int queue[20]; 2 int front,rear; 3 4 void clear() 5 { 6 front = rear = -1; 7 } 8 9 int size() 10 { 11 return (rear-front)

Linux(CentOS)安裝Node.JSnpm的方式(yum安裝原始碼安裝)

宣告版本: Linux版本:CentOS release 6.9 / 64位 yum安裝 yum安裝是將yum源中的rpm包下載到本地,安裝這個rpm包。這個rpm包是別人編譯安裝好的二進位制包。這種方式方便快捷,特別是不用考慮包依賴。 0、

通過JVM記憶體模型深入理解值傳遞引用傳遞方式

值傳遞和引用傳遞分析Java中資料型別分為兩大類:基本型別和引用型別(也就是物件型別)。基本型別:boolean、char、byte、short、int、long、float、double引用型別:類、介面、陣列因此,變數型別也可分為兩大類:基本型別和引用型別。在分析值傳遞

Oracle資料庫的匯入匯出的方式

首先,我們匯入匯出資料,肯定是要通過oracle自帶的可執行程式來完成資料的匯入匯出工作,imp.exe 和exp.exe這兩個可執行檔案都放在oracle安裝目錄下的BIN目錄下。 1.以cmd命令操作符匯入匯出 先以cmd命令操作符來演示,比如我的B

Android ScrollView監聽滑動到頂部底部的方式

import android.content.Context; import android.util.AttributeSet; import android.widget.ScrollView; /** * 監聽ScrollView滾動到頂部或者底部做相關事件攔截 */ public class S

C++使用迭代遞迴方式實現連結串列逆序演算法

1.連結串列逆序的兩種演算法       C++實現一個連結串列逆序演算法 2.連結串列逆序演算法實現原理 如A->B->C->D->E,一般會有以下兩種思路,如下 思路1: 先取出連結串列的最後一個E,然後將E作為新連結串列的頭, 現在狀

傳智播客JNI第七講 – JNI中的全域性引用/區域性引用/弱全域性引用、快取jfieldIDjmethodID的方式

 講解JNI中的全域性引用/區域性引用/弱全域性引用、快取jfieldID和jmethodID的兩種方式,並編寫兩種快取

C#簡單爬取資料(.NET使用HTML解析器ESoup正則方式匹配資料)

一、獲取資料 想弄一個數據庫,由於需要一些人名,所以就去百度一下,然後發現了360圖書館中有很多人名 然後就像去複製一下,發現複製不了,需要登陸 此時f12檢視原始碼是可以複製的,不過就算可以複製想要插入資料也是很麻煩的。既然複製走不通,於是我抱著探索知識的精神,打開了Visual Studio 首先我