1. 程式人生 > >Spark取到Kafka,出現ZK和Kafka offset不一致

Spark取到Kafka,出現ZK和Kafka offset不一致

在專案中用到Spark Streaming讀取Kafka,應用的是Kafka的low level的API因此手動的把Offset儲存到ZK(每次執行成功後,才更新zk中的offset資訊)當中,但是如果出現Kafka出現網路問題或者ZK沒有寫入到情況就會出現ZK的offset和Kafka的offset不一致。此時就要對比Kafka和ZK中的Offset

PS:另外spark中也可以做checkpoint來儲存state

邏輯:
如果ZK中的offset小於 EarliestOffset 大於LastestOffset說明ZK中的offset已經失效,把ZK中的offset更新為EarliestOffset;如果ZK的offset在 EarliestOffset 大於LastestOffset之間那麼以ZK的offset為準

KafkaUtil (SimpleConsumer從Kafka讀取offset)

public class KafkaUtil implements
Serializable {
private static final long serialVersionUID = -7708717328840L; private static KafkaUtil kafkaUtil = null; private KafkaUtil() { } public static KafkaUtil getInstance() { if (kafkaUtil == null) { synchronized (KafkaUtil.class) { if
(kafkaUtil == null) { kafkaUtil = new KafkaUtil(); } } } return kafkaUtil; } /** * 從brokerList中獲取host * * @param brokerList * @return */ public String[] getHostFromBrokerList(String brokerList) { String[] brokers = brokerList.split(","); for (int i = 0; i < brokers.length; i++) { brokers[i] = brokers[i].split(":")[0]; } return brokers; } /** * 從brokerList中獲取port * * @param brokerList * @return */ public Map<String, Integer> getPortFromBrokerList(String brokerList) { Map<String, Integer> portMap = new HashMap<String, Integer>(); String[] brokers = brokerList.split(","); for (int i = 0; i < brokers.length; i++) { String host = brokers[i].split(":")[0]; Integer port = Integer.valueOf(brokers[i].split(":")[1]); portMap.put(host, port); } return portMap; } public KafkaTopicOffset topicAndMetadataRequest(String brokerList, String topic) { List<String> topics = Collections.singletonList(topic); TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(topics); KafkaTopicOffset kafkaTopicOffset = new KafkaTopicOffset(topic); String[] hosts = getHostFromBrokerList(brokerList); Map<String, Integer> portMap = getPortFromBrokerList(brokerList); for (String host : hosts) { SimpleConsumer simpleConsumer = null; try { simpleConsumer = new SimpleConsumer(host, portMap.get(host), Constant.TIME_OUT, Constant.BUFFERSIZE, Constant.groupId); kafka.javaapi.TopicMetadataResponse response = simpleConsumer.send(topicMetadataRequest); List<TopicMetadata> topicMetadatas = response.topicsMetadata(); for (TopicMetadata metadata : topicMetadatas) { for (PartitionMetadata partitionMetadata : metadata.partitionsMetadata()) { kafkaTopicOffset.getLeaderList().put(partitionMetadata.partitionId(), partitionMetadata.leader().host()); kafkaTopicOffset.getOffsetList().put(partitionMetadata.partitionId(), 0L); } } } catch (Exception e) { e.printStackTrace(); } finally { if (simpleConsumer != null) { simpleConsumer.close(); } } } return kafkaTopicOffset; } /** * 從Kafka取出某個topic中某個partition的最小或者最大offset * * @param brokerList * @param topic * @return */ public KafkaTopicOffset getOffset(String brokerList, String topic, String flag) { KafkaTopicOffset kafkaTopicOffset = topicAndMetadataRequest(brokerList, topic); String[] hosts = getHostFromBrokerList(brokerList); Map<String, Integer> portMap = getPortFromBrokerList(brokerList); for (String host : hosts) { Iterator iterator = kafkaTopicOffset.getOffsetList().entrySet().iterator(); SimpleConsumer simpleConsumer = null; try { simpleConsumer = new SimpleConsumer(host, portMap.get(host), Constant.TIME_OUT, Constant.BUFFERSIZE, Constant.groupId); while (iterator.hasNext()) { Map.Entry<Integer, Long> entry = (Map.Entry<Integer, Long>) iterator.next(); int partitionId = entry.getKey(); //判斷當前的host是否為leader if (!kafkaTopicOffset.getLeaderList().get(partitionId).equals(partitionId)) { continue; } TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); if (flag.equals(Constant.EARLIEST_OFFSET)) { requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1)); } else if (flag.equals(Constant.LATEST_OFFSET)) { requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1)); } OffsetRequest offsetRequest = new OffsetRequest(requestInfoMap, kafka.api.OffsetRequest.CurrentVersion(), Constant.groupId); OffsetResponse offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest); long[] offset = offsetResponse.offsets(topic, partitionId); if (offset.length > 0) { kafkaTopicOffset.getOffsetList().put(partitionId, offset[0]); } } } catch (Exception e) { e.printStackTrace(); } finally { if (simpleConsumer != null) { simpleConsumer.close(); } } } return kafkaTopicOffset; } }

相關推薦

SparkKafka出現ZKKafka offset一致

在專案中用到Spark Streaming讀取Kafka,應用的是Kafka的low level的API因此手動的把Offset儲存到ZK(每次執行成功後,才更新zk中的offset資訊)當中,但是如果出現Kafka出現網路問題或者ZK沒有寫入到情況就會出現ZK

MongoDB find命令匹配資料匹配內容檢索條件一致

  最後一位隨便輸一個數,就匹配這一條。 問題描述:今天計劃將tweet_id設定為集合的唯一索引,出現一條資料報錯。報錯對應的tweet_id為“255837612277911555”,在pymongo查詢這條資料發現出現的結果和查詢內容最後一位不一致。於是修改最後一位

C#中出現Inconsistent accessibility返回型別一致問題

C#中,出現Inconsistent accessibility返回型別不一致問題 2017年03月06日 14:59:01 QianSong_Promise 閱讀數:499 標籤: C#返回一致性屬性 更多 個人分類: C#

Kafka工作流程-KafkaClusterKafka 高可靠性儲存

1.KafkaCluster     在使用 Kafka 低階消費者時,可以通過 KafkaCluster 類實現 offset 向 ZooKeeper 的提交 和獲取。     Kafka 協議非常簡單,只有六個核心客戶端請求 API:         元資料(Met

微信小程式:點選商品+出現數量-

問題描述: 商品列表中只有“+”,點選之後出現“-1+”,可以對數量進行增減, 效果 .wxml <view class="add-box">

當執行npm publish 時出現unauthorized is not in the npm registry

當執行npm publish,發生瞭如下錯誤: appledeMacBook-Pro:nini-react apple$ npm publish npm ERR! publish Failed PUT 401 npm ERR! code E401 npm ERR! 404

css 對div用hover設定border出現抖動div走位問題解決方法

樣式設定 : div:hover { border:1px solid red;} 當滑鼠移動到div時,產生抖動和偏移。 產生的原因: 是因為設定border時設定了1px邊框,多出的這1px,與其它元素產生了擠壓,

jq 點編輯出現input框可進行編輯出現儲存取消

首先是效果圖 下面是html前臺的佈局程式碼 再是jq的程式碼 <script> //頁面載入時將input框和儲存和取消隱藏 $(document).ready(fu

Unity NavMesh尋路檢測的bug(或者特性)爬坡卡住問題。(角色高度網格高度一致造成)

Unity專案,由於人物移動時一般用搖桿或者方向鍵控制, 需要有八方向方式控制朝向,  所以沒有用 NavMesh Agent, 而是自己控制人物方向移動,然後貼合地面。 用了NavMesh.CalculatePath只是用於目標點的尋路, 尋找出路經後自己計算實現移動。

使用maven建立ssm專案時出現dataSurce的bean建立了的問題

啟動專案時,專案啟動不了,直接報錯,並說是資料來源的問題,但是沒有實際的錯誤提示,說datasource建立失敗,後面還加上一些找不到類的錯誤 18-Oct-2018 17:03:20.952 嚴重 [RMI TCP Connection(3)-127.0.0.1] org.spring

用cmd執行記事本寫的java檔案以及jdk版本執行版本一致原因及其解決辦法

1.首先新建一個記事本,檔名改為Welcome.java拓展名也要改!(屬性裡面改為顯示拓展名,這樣才可以通過重新命名的方式改檔案個格式) public class Welcome{ public static void main(String[] args){ System.out.pr

基因資料處理122之SSWSparkSW評分一致query為Q9

基因資料處理系列 1.解釋 RT,但是順序一致 2.程式碼: [email protected]:~/disk2/xubo/project/alignment/SparkSW/SparkSW20161114/alluxio-1.3.

解決echarts的多個折現資料出現座標值對上的問題

當出現多個折現資料,echarts可以配置stack值使用堆積值還是單個值 option = { noDataLoadingOption: { text: '暫無資料', effect: 'bubble', effectOption: {effec

ROS Base pathSource space一致問題修改檔名後無法make問題,catkin_make報錯問題

在一次在ROS 的學習實踐中,將Ros工程目錄名稱更改了,source後,出現了base path和Source space不一致問題:報錯提示:Base path: /home/pot/catkin_ws_topSource space: /home/pot/catkin_

多行轉多列行數列數確定

原始需求,有2表如下 SQL> select * from mas; TO TOOLNAME -- ---------- 01 包裹 02 信函 03 掛號信 04 中國速遞 05 EMS 06 DHL 6 rows selected. SQL> select * from putdt; SEN

catkin_make報錯: ROS Base pathSource space一致問題

問題描述: #### #### Running command: "make cmake_check_build_system" in "/home/zhumeiqiang/documents/ros/build" #### #### #### Running comman

ArcGIS Engine開發.NET4.0降為.NET3.5後出現的”試圖載入格式正確的程式“錯誤解決方法

有一個軟體開發專案,編譯器是VS2010,之前用的目標框架是.NET4.0,現要換在.NET3.5,結果生成出現以下錯誤: 錯誤    254    未能載入檔案或程式集“file:///C:/Program Files (x86)/ArcGIS/DeveloperKit1

Android studio匯入工程java檔案出現紅色Jgradle外掛與gradle版本一致 解決辦法

     在這個程式碼搬運的時代,合理運用開源優秀程式碼,已經成為了每一個程式設計師必不可少的技能。      我相信大家在匯入第三方module或者第三方工程的時候,都遇到過,開啟的工程JAVA檔案

dfdu 結果一致差別很大df -h看到是利用率100%

今天發現有臺測試伺服器的根目錄利用率100%,根目錄120G,但是du -sh發現所有資料夾加在一起也不到40G。 一共嘗試了4種方法,終於解決。 1、網上的說法都是說有已經刪除的檔案,但是程序仍在使用,使用lsof | grep deleted檢視已刪除但

crontab執行時間系統時間一致

google 服務器 shanghai share crontab執行時間和系統時間不一致