1. 程式人生 > >kafka手動修改消費者偏移量

kafka手動修改消費者偏移量

1.建立一個測試主題:

[[email protected] bin]#./kafka-topics.sh --zookeeper snn:2181 --topic offset-test --partitions 2 --replication-factor 2 –create

檢查主題的設定情況:

[[email protected] bin]#./kafka-topics.sh --zookeeper snn:2181 --topic offset-test –describe

2.向主題中寫入資料

[[email protected] bin]# ./kafka-console-producer.sh --broker-list snn:6667 --topic offset-test

3.消費主題

[[email protected] bin]# ./kafka-console-consumer.sh --bootstrap-server snn:6667 --group offsettest --topic offset-test –from-beginning

      

4.檢視消費者消費偏移量

[[email protected] bin]#./kafka-consumer-groups.sh --bootstrap-server snn:6667 --group offsettest –describe

5.設定為最初偏移量:

[

[email protected] bin]#./kafka-consumer-groups.sh --bootstrap-server snn:6667 --group offsettest --topic offset-test --reset-offsets --to-earliest –execute

重新讀取資料:

6.設定任意偏移量:

[[email protected] bin]# ./kafka-consumer-groups.sh --bootstrap-server snn:6667 --group offsettest --topic offset-test --reset-offsets --to-offset 3 –execute

7.設定最近偏移量

[[email protected] bin]# ./kafka-consumer-groups.sh --bootstrap-server snn:6667 --group offsettest --topic offset-test --reset-offsets --to-latest --execute

相關推薦

kafka手動修改消費者偏移

1.建立一個測試主題: [[email protected] bin]#./kafka-topics.sh --zookeeper snn:2181 --topic offset-test --partitions 2 --replication-factor 2

Kafka(七)消費者偏移

sof () 取模 失敗 data 兩種方法 保存 庫存 num 在Kafka0.9版本之前消費者保存的偏移量是在zookeeper中/consumers/GROUP.ID/offsets/TOPIC.NAME/PARTITION.ID。新版消費者不在保存偏移量到zooke

Kafka_Kafka 消費者 偏移 與 積壓 查詢指令碼 kafka-consumer-groups.sh

本文章對應的 kafka 版本是  kafka_2.11-0.10.0.1 版本號的含義 scala 2.11 kafka 0.10.0.1 背景:    kafka 0.9 及以上 有了一個大版本變化, 主要有以下幾個方面:   1.kafka-cli

zookeeper上修改kafka消費組的偏移

[[email protected] bin]$ zookeeper-shell.sh 192.168.0.1:2181 Connecting to 192.168.0.1:2181 Wel

分散式訊息系統:Kafka(五)偏移

5、偏移量提交 5.1 偏移量 (1)新舊版本偏移量的變化   在Kafka0.9版本之前消費者儲存的偏移量是在zookeeper中/consumers/GROUP.ID/offsets/TOPIC.NAME/PARTITION.ID。新版消費者不在儲

sparkStreaming 與fafka直接方式 進行消費者偏移的保存如redis 裏面 避免代碼改變與節點重啟後的數據丟失與序列化問題

create term tex ria streaming 保存 else config cal import java.util import kafka.common.TopicAndPartition import kafka.message.Messag

如何手動更新Kafka中某個Topic的偏移

轉載: 我們在使用consumer消費資料時,有些情況下我們需要對已經消費過的資料進行重新消費,這裡介紹kafka中兩種重新消費資料的方法。 1. 修改offset 我們在使用consumer消費的時候,每個topic會產生一個偏移量,這個偏移量保證我們消費的訊息順

kafka重置到最新offset偏移

ray IT 每次 lis 設置 通過 默認 cname 解決問題 小弟近日用kafka測試傳輸數據設置的單消費者,不料消費者頭天晚上就掛掉了 ,重啟消費者,因為auto.offset.reset 默認為latest,所以消費者從昨天晚上的數據接著消費,因為差了一晚上了,消

SparkStreaming消費Kafka中的資料 使用zookeeper和MySQL儲存偏移的兩種方式

Spark讀取Kafka資料的方式有兩種,一種是receiver方式,另一種是直連方式。今天分享的SparkStreaming消費Kafka中的資料儲存偏移量的兩種方式都是基於直連方式上的 話不多說 直接上程式碼 ! 第一種是使用zookeeper儲存偏移量 object Kafka

mysql主鍵自增值和偏移的檢視和修改

1、檢視mysql自增值和偏移量 show variables like '%increment%'; auto_increment_increment=1  -- 自增倍數是1 auto_increment_offset=1 -- 偏移量是1 上邊這是一般的設定,每次

Spark Streaming管理Kafka偏移

前言 為了讓Spark Streaming消費kafka的資料不丟資料,可以建立Kafka Direct DStream,由Spark Streaming自己管理offset,並不是存到zookeeper。啟用S​​park Streaming的 checkpoints是儲存偏移量的最簡單方法,因為它可以

Spark Streaming 之 Kafka 偏移管理

本文主要介紹 Spark Streaming 應用開發中消費 Kafka 訊息的相關內容,文章著重突出了開發環境的配置以及手動管理 Kafka 偏移量的實現。 一、開發環境 1、元件版本 CDH 叢集版本:6.0.1 Spark 版本:2.2.0 Kafka 版本:1.0.1 2、M

使用redis儲存kafka偏移

使用redis儲存kafka的偏移量 轉自:Lu_Xiao_Yue 使用Redis來記錄偏移量,以前用receive方式時,使用zookeeper儲存偏移量,不用自己儲存偏移量,使用直連方式可以自己儲存偏移量,更加靈活。在直連方式中,儲存偏移量可以使用zookeeper,也可以使用mys

kafka同步非同步消費和訊息的偏移(四)

1. 消費者位置(consumer position) 因為kafka服務端不儲存訊息的狀態,所以消費端需要自己去做很多事情。我們每次呼叫poll()方法他總是返回已經儲存在生產者佇列中還未被消費者消費的訊息。訊息在每一個分割槽中都是順序的,那麼必然可以通過一

如何管理Spark Streaming消費Kafka偏移(二)

上篇文章,討論了在spark streaming中管理消費kafka的偏移量的方式,本篇就接著聊聊上次說升級失敗的案例。 事情發生一個月前,由於當時我們想提高spark streaming程式的並行處理效能,於是需要增加kafka分割槽個數,,這裡需要說下,在新版本sp

如何管理Spark Streaming消費Kafka偏移(三)

前面的文章已經介紹了在spark streaming整合kafka時,如何處理其偏移量的問題,由於spark streaming自帶的checkpoint弊端非常明顯,所以一些對資料一致性要求比較高的專案裡面,不建議採用其自帶的checkpoint來做故障恢復。 在sp

Spark+Kafka的Direct方式將偏移傳送到Zookeeper的實現

Apache Spark 1.3.0引入了Direct API,利用Kafka的低層次API從Kafka叢集中讀取資料,並且在Spark Streaming系統裡面維護偏移量相關的資訊,並且通過這種方式去實現零資料丟失(zero data loss)相比使用基於Receiver的方法要高效。但是因為是Spar

Spark+Kafka的Direct方式將偏移傳送到Zookeeper實現

Apache Spark 1.3.0引入了Direct API,利用Kafka的低層次API從Kafka叢集中讀取資料,並且在Spark Streaming系統裡面維護偏移量相關的資訊,並且通過這種方式去實現零資料丟失(zero data loss)相比使用基於Rece

sparkstreaming整合kafka引數設定,message偏移寫入redis

kafka高階資料來源拉取到spark,偏移量自我維護寫入到redis,建立redis連線池。需要匯入<groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-

Kafka 訊息偏移的維護

Kafka是大資料領域常用的訊息佇列,其高效的吞吐量和分散式容錯等特性是其收到青睞的重要原因。 kafka訊息的位置 用好Kafka,維護其訊息偏移量對於避免訊息的重複消費與遺漏消費,確保訊息的Exactly-once是至關重要的。 kafka的訊息所