Spark取到Kafka,出現ZK和Kafka offset不一致
在專案中用到Spark Streaming讀取Kafka,應用的是Kafka的low level的API因此手動的把Offset儲存到ZK(每次執行成功後,才更新zk中的offset資訊)當中,但是如果出現Kafka出現網路問題或者ZK沒有寫入到情況就會出現ZK的offset和Kafka的offset不一致。此時就要對比Kafka和ZK中的Offset
PS:另外spark中也可以做checkpoint來儲存state
- Using checkpoints
- Keeping track of the offsets that have been processed.
另外it takes time for Spark to prepare them and store them
checkpoint比較耗時(平均時間3S做checkpoint)
牆裂推薦:
邏輯:
如果ZK中的offset小於 EarliestOffset 大於LastestOffset說明ZK中的offset已經失效,把ZK中的offset更新為EarliestOffset;如果ZK的offset在 EarliestOffset 大於LastestOffset之間那麼以ZK的offset為準
KafkaUtil (SimpleConsumer從Kafka讀取offset)
public class KafkaUtil implements Serializable {
private static final long serialVersionUID = -7708717328840L;
private static KafkaUtil kafkaUtil = null;
private KafkaUtil() {
}
public static KafkaUtil getInstance() {
if (kafkaUtil == null) {
synchronized (KafkaUtil.class) {
if (kafkaUtil == null) {
kafkaUtil = new KafkaUtil();
}
}
}
return kafkaUtil;
}
/**
* 從brokerList中獲取host
*
* @param brokerList
* @return
*/
public String[] getHostFromBrokerList(String brokerList) {
String[] brokers = brokerList.split(",");
for (int i = 0; i < brokers.length; i++) {
brokers[i] = brokers[i].split(":")[0];
}
return brokers;
}
/**
* 從brokerList中獲取port
*
* @param brokerList
* @return
*/
public Map<String, Integer> getPortFromBrokerList(String brokerList) {
Map<String, Integer> portMap = new HashMap<String, Integer>();
String[] brokers = brokerList.split(",");
for (int i = 0; i < brokers.length; i++) {
String host = brokers[i].split(":")[0];
Integer port = Integer.valueOf(brokers[i].split(":")[1]);
portMap.put(host, port);
}
return portMap;
}
public KafkaTopicOffset topicAndMetadataRequest(String brokerList, String topic) {
List<String> topics = Collections.singletonList(topic);
TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(topics);
KafkaTopicOffset kafkaTopicOffset = new KafkaTopicOffset(topic);
String[] hosts = getHostFromBrokerList(brokerList);
Map<String, Integer> portMap = getPortFromBrokerList(brokerList);
for (String host : hosts) {
SimpleConsumer simpleConsumer = null;
try {
simpleConsumer = new SimpleConsumer(host, portMap.get(host), Constant.TIME_OUT, Constant.BUFFERSIZE, Constant.groupId);
kafka.javaapi.TopicMetadataResponse response = simpleConsumer.send(topicMetadataRequest);
List<TopicMetadata> topicMetadatas = response.topicsMetadata();
for (TopicMetadata metadata : topicMetadatas) {
for (PartitionMetadata partitionMetadata : metadata.partitionsMetadata()) {
kafkaTopicOffset.getLeaderList().put(partitionMetadata.partitionId(), partitionMetadata.leader().host());
kafkaTopicOffset.getOffsetList().put(partitionMetadata.partitionId(), 0L);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (simpleConsumer != null) {
simpleConsumer.close();
}
}
}
return kafkaTopicOffset;
}
/**
* 從Kafka取出某個topic中某個partition的最小或者最大offset
*
* @param brokerList
* @param topic
* @return
*/
public KafkaTopicOffset getOffset(String brokerList, String topic, String flag) {
KafkaTopicOffset kafkaTopicOffset = topicAndMetadataRequest(brokerList, topic);
String[] hosts = getHostFromBrokerList(brokerList);
Map<String, Integer> portMap = getPortFromBrokerList(brokerList);
for (String host : hosts) {
Iterator iterator = kafkaTopicOffset.getOffsetList().entrySet().iterator();
SimpleConsumer simpleConsumer = null;
try {
simpleConsumer = new SimpleConsumer(host, portMap.get(host), Constant.TIME_OUT, Constant.BUFFERSIZE, Constant.groupId);
while (iterator.hasNext()) {
Map.Entry<Integer, Long> entry = (Map.Entry<Integer, Long>) iterator.next();
int partitionId = entry.getKey();
//判斷當前的host是否為leader
if (!kafkaTopicOffset.getLeaderList().get(partitionId).equals(partitionId)) {
continue;
}
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
if (flag.equals(Constant.EARLIEST_OFFSET)) {
requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1));
} else if (flag.equals(Constant.LATEST_OFFSET)) {
requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
}
OffsetRequest offsetRequest = new OffsetRequest(requestInfoMap, kafka.api.OffsetRequest.CurrentVersion(), Constant.groupId);
OffsetResponse offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest);
long[] offset = offsetResponse.offsets(topic, partitionId);
if (offset.length > 0) {
kafkaTopicOffset.getOffsetList().put(partitionId, offset[0]);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (simpleConsumer != null) {
simpleConsumer.close();
}
}
}
return kafkaTopicOffset;
}
}
相關推薦
Spark取到Kafka,出現ZK和Kafka offset不一致
在專案中用到Spark Streaming讀取Kafka,應用的是Kafka的low level的API因此手動的把Offset儲存到ZK(每次執行成功後,才更新zk中的offset資訊)當中,但是如果出現Kafka出現網路問題或者ZK沒有寫入到情況就會出現ZK
MongoDB find命令匹配資料,匹配內容和檢索條件不一致
最後一位隨便輸一個數,就匹配這一條。 問題描述:今天計劃將tweet_id設定為集合的唯一索引,出現一條資料報錯。報錯對應的tweet_id為“255837612277911555”,在pymongo查詢這條資料發現出現的結果和查詢內容最後一位不一致。於是修改最後一位
C#中,出現Inconsistent accessibility返回型別不一致問題
C#中,出現Inconsistent accessibility返回型別不一致問題 2017年03月06日 14:59:01 QianSong_Promise 閱讀數:499 標籤: C#返回一致性屬性 更多 個人分類: C#
Kafka工作流程-KafkaCluster和Kafka 高可靠性儲存
1.KafkaCluster 在使用 Kafka 低階消費者時,可以通過 KafkaCluster 類實現 offset 向 ZooKeeper 的提交 和獲取。 Kafka 協議非常簡單,只有六個核心客戶端請求 API: 元資料(Met
微信小程式:點選商品+,出現數量和-
問題描述: 商品列表中只有“+”,點選之後出現“-1+”,可以對數量進行增減, 效果 .wxml <view class="add-box">
當執行npm publish 時,出現unauthorized 和 is not in the npm registry
當執行npm publish,發生瞭如下錯誤: appledeMacBook-Pro:nini-react apple$ npm publish npm ERR! publish Failed PUT 401 npm ERR! code E401 npm ERR! 404
css 對div用hover設定border,出現抖動和div走位問題,解決方法
樣式設定 : div:hover { border:1px solid red;} 當滑鼠移動到div時,產生抖動和偏移。 產生的原因: 是因為設定border時設定了1px邊框,多出的這1px,與其它元素產生了擠壓,
jq 點編輯出現input框可進行編輯,出現儲存和取消
首先是效果圖 下面是html前臺的佈局程式碼 再是jq的程式碼 <script> //頁面載入時將input框和儲存和取消隱藏 $(document).ready(fu
Unity NavMesh尋路檢測的bug(或者特性),爬坡卡住問題。(角色高度和網格高度不一致造成)
Unity專案,由於人物移動時一般用搖桿或者方向鍵控制, 需要有八方向方式控制朝向, 所以沒有用 NavMesh Agent, 而是自己控制人物方向移動,然後貼合地面。 用了NavMesh.CalculatePath只是用於目標點的尋路, 尋找出路經後自己計算實現移動。
使用maven建立ssm專案時,出現dataSurce的bean建立不了的問題
啟動專案時,專案啟動不了,直接報錯,並說是資料來源的問題,但是沒有實際的錯誤提示,說datasource建立失敗,後面還加上一些找不到類的錯誤 18-Oct-2018 17:03:20.952 嚴重 [RMI TCP Connection(3)-127.0.0.1] org.spring
用cmd執行記事本寫的java檔案,以及jdk版本和執行版本不一致原因及其解決辦法
1.首先新建一個記事本,檔名改為Welcome.java拓展名也要改!(屬性裡面改為顯示拓展名,這樣才可以通過重新命名的方式改檔案個格式) public class Welcome{ public static void main(String[] args){ System.out.pr
基因資料處理122之SSW和SparkSW評分不一致,query為Q9
基因資料處理系列 1.解釋 RT,但是順序一致 2.程式碼: [email protected]:~/disk2/xubo/project/alignment/SparkSW/SparkSW20161114/alluxio-1.3.
解決echarts的多個折現資料出現座標和值對不上的問題
當出現多個折現資料,echarts可以配置stack值使用堆積值還是單個值 option = { noDataLoadingOption: { text: '暫無資料', effect: 'bubble', effectOption: {effec
ROS Base path和Source space不一致問題,修改檔名後無法make問題,catkin_make報錯問題
在一次在ROS 的學習實踐中,將Ros工程目錄名稱更改了,source後,出現了base path和Source space不一致問題:報錯提示:Base path: /home/pot/catkin_ws_topSource space: /home/pot/catkin_
多行轉多列,行數和列數不確定
原始需求,有2表如下 SQL> select * from mas; TO TOOLNAME -- ---------- 01 包裹 02 信函 03 掛號信 04 中國速遞 05 EMS 06 DHL 6 rows selected. SQL> select * from putdt; SEN
catkin_make報錯: ROS Base path和Source space不一致問題,
問題描述: #### #### Running command: "make cmake_check_build_system" in "/home/zhumeiqiang/documents/ros/build" #### #### #### Running comman
ArcGIS Engine開發,.NET4.0降為.NET3.5後,出現的”試圖載入格式不正確的程式“錯誤解決方法
有一個軟體開發專案,編譯器是VS2010,之前用的目標框架是.NET4.0,現要換在.NET3.5,結果生成出現以下錯誤: 錯誤 254 未能載入檔案或程式集“file:///C:/Program Files (x86)/ArcGIS/DeveloperKit1
Android studio匯入工程java檔案出現紅色J,gradle外掛與gradle版本不一致 解決辦法
在這個程式碼搬運的時代,合理運用開源優秀程式碼,已經成為了每一個程式設計師必不可少的技能。 我相信大家在匯入第三方module或者第三方工程的時候,都遇到過,開啟的工程JAVA檔案
df和du 結果不一致,差別很大,df -h看到是利用率100%
今天發現有臺測試伺服器的根目錄利用率100%,根目錄120G,但是du -sh發現所有資料夾加在一起也不到40G。 一共嘗試了4種方法,終於解決。 1、網上的說法都是說有已經刪除的檔案,但是程序仍在使用,使用lsof | grep deleted檢視已刪除但
crontab執行時間和系統時間不一致
google 服務器 shanghai share crontab執行時間和系統時間不一致