1. 程式人生 > >初識 Kafka+JAVA程式例項

初識 Kafka+JAVA程式例項

本文是從英文的官網摘了翻譯的,用作自己的整理和記錄。水平有限,歡迎指正。版本是: kafka_2.10-0.10.0.0 
  

一、基礎概念

  • 主題:Kafka maintains feeds of messages in categories called topics. 
      
  • 生產者:We’ll call processes that publish messages to a Kafka topic producers. 
         
  • 消費者:We’ll call processes that subscribe to topics and process the feed of published messages consumers. 
         
  • 代理(Broker):Kafka is run as a cluster comprised of one or more servers each of which is called a broker.

     生產者通過網路將訊息傳送到Kafka叢集上,叢集依次(輪流)服務訊息到達消費者。 Kafka執行在一個叢集中,叢集中的每一個伺服器就叫代理。 
    這裡寫圖片描述

  • Partition:Partition 是物理上的概念,每個 Topic 包含一個或多個 Partition。

主題和日誌

  一個主題是命名或分類釋出的訊息。每一個主題,Kafka持有一個分割槽日誌,看起來像下面圖片。  
  這裡寫圖片描述 
  每一個Partition都是有序的,固定長度的訊息佇列一直不斷增加到–一個提交日誌。訊息在Partition內分配了順序的id叫偏移量,這個偏移量在分割槽中唯一標識每個訊息的。 
  Kafka儲存所有(一段時間內的-可配置)已經發布的訊息-無論它們是否已經被消費。例如,如果日誌保留被設定為兩天,那麼在一個訊息釋出後,兩天內它是可用的,兩天後它將被丟棄到空閒空間

事實上,元資料保留在每個消費程序中,是基於消費程序在日誌中的位置,該位置稱為“偏移量”(In fact the only metadata retained on a per-consumer basis is the position of the consumer in the log, called the “offset”.)。這個偏移量被消費者控制:正常的消費者讀取訊息時,線性增加偏移量,但事實上消費者可以以任何它順序的方式來控制。例如:一個消費者可以重置到以前的偏移量位置來重新處理。 
  這種組合的特點意味著Kafka的消費者是很廉價的-消費者程序可以隨時增加減少,對叢集和其它消費者程序沒有任何影響。例如:你可以使用命令列工具輸出任何主題的內容,而不改變任何現有的消費者所消耗的。 
  日誌中的分割槽服務幾個目的。首先,日誌的規模大小可以調整,遠不是隻有一個在一個伺服器上。每個單獨的分割槽都必須安裝在主機上的伺服器上,一個主題可以有許多分割槽,所以它可以處理任意數量的資料。第二,它們都是獨立相互平行的。 

Distribution(分佈)

日誌的分割槽分佈在Kafka叢集中的伺服器上,每個伺服器處理資料,並請求分割槽內容的副本。為了容錯,每個分割槽的副本數量是可以通過伺服器設定的。 
  
每個分割槽都有一個伺服器它充當“leader”和0到更多的伺服器,作為“followers”。leader處理所有的讀寫請求,而followers被動地複製的leader。如果leader失敗,其中一個“followers”將自動成為新的“leader”。 
 

Producers

生產者將資料釋出到他們所選擇的主題。生產者負責選擇那個訊息分配到那個主題的哪個partition。至於選擇哪個分割槽可以簡單的迴圈方式達到負載均衡,也可以者根據語義功能來分割槽。

Consumers

  每個消費者把自己標示到一個消費組,當每個訊息釋出到主題後,訊息在投遞到每個訂閱消費組一個消費例項。消費者例項可以在不同的程序或不同的機器上。 
  如果所有的消費者例項都有相同的消費組,那麼這就像一個傳統的佇列。 
  如果所有的消費者例項都有不同的消費組,那麼這類作品就如釋出訂閱,所有的資訊都被廣播給所有的消費者。 
  然而,更常見的是主題有一個小數量的消費組,每一個為“邏輯訂閱。每個組都是由許多消費例項,為了可擴充套件性和容錯性。 
  Kafka有比傳統訊息系統更強壯的順序保證。 
  傳統的佇列在伺服器上保留順序訊息,如果多個消費者從佇列中消費,然後伺服器將它們儲存的訊息按照順序傳送出去。然而,雖然伺服器按照順序傳送訊息,但是訊息傳遞非同步傳送給消費者,所以訊息到達消費者時可能失序了。這種高效意味著在並行消費過程中,訊息的順序丟失。訊息傳遞系統經常圍繞這個工作,有一個“exclusive consumer“的概念,它只允許一個程序從一個佇列中消耗,但當然這意味著沒有並行性處理的可能性。 
  Kafka做得更好。通過對主題進行分割槽,Kafka是既能保證順序,又能負載均衡的消費。這是通過給主題進行分割槽,然後給消費組,使的每個分割槽都被組內唯一消費程序消費。通過這樣做,我們確保消程序是唯一的讀取那個分割槽,並消費資料的順序。請注意,在一個消費組中,不能有比分割槽更多的消費程序。

Kafka只在一個分割槽中的訊息提供了一個總的順序,而不是在一個主題中的不同分割槽之間的。然而,如果您需要一個完全有序的訊息,這可以通過一個主題和一個分割槽來實現,顯然這將意味著每一個消費組只有一個消費程序。 
 

Guarantees(保證)

Kafka給出了以下保證:

  • 生產者傳送到一個特定主題的分割槽的訊息,將被新增,並且傳送是順序的。
  • 各消費例項看到訊息是順序,並且儲存在日誌裡。
  • 一個主題由N各複製備份,我們將容忍N-1伺服器故障而不丟失任何資訊提交到日誌。 
     

二、程式例項

重要的來了,上面看不懂的沒關係,看程式,最直接。 
假如我們有一個主題叫foo,它有4個分割槽。我建立了兩個消費組GroupA and GroupB 
 這裡寫圖片描述 
其中GroupA有2個消費者,GroupB有4個消費者。 
我們的生產者平均向4個分割槽寫入了內容。例:

package part;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class TestProducer {
    public static void main(String[] args) {
         Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         //The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
        //“所有”設定將導致記錄的完整提交阻塞,最慢的,但最持久的設定。
         props.put("acks", "all");
         //如果請求失敗,生產者也會自動重試,即使設定成0 the producer can automatically retry.
         props.put("retries", 0);

         //The producer maintains buffers of unsent records for each partition. 
         props.put("batch.size", 16384);
         //預設立即傳送,這裡這是延時毫秒數
         props.put("linger.ms", 1);
         //生產者緩衝大小,當緩衝區耗盡後,額外的傳送呼叫將被阻塞。時間超過max.block.ms將丟擲TimeoutException
         props.put("buffer.memory", 33554432);
         //The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes.
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

         //建立kafka的生產者類
         Producer<String, String> producer = new KafkaProducer<String, String>(props);
         //生產者的主要方法
         // close();//Close this producer.
         //   close(long timeout, TimeUnit timeUnit); //This method waits up to timeout for the producer to complete the sending of all incomplete requests.
         //  flush() ;所有快取記錄被立刻傳送
         for(int i = 0; i < 100; i++)
          //這裡平均寫入4個分割槽
             producer.send(new ProducerRecord<String, String>("foo",i%4, Integer.toString(i), Integer.toString(i)));
             producer.close();
    }
}

消費者

package part;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class TestConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        System.out.println("this is the group part test 1");
        //消費者的組id
        props.put("group.id", "GroupA");//這裡是GroupA或者GroupB

        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");

        //從poll(拉)的回話處理時長
        props.put("session.timeout.ms", "30000");
        //poll的數量限制
     //props.put("max.poll.records", "100");

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //訂閱主題列表topic
        consumer.subscribe(Arrays.asList("foo"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                // 正常這裡應該使用執行緒池處理,不應該在這裡處理
                System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()+"\n");

        }
    }

}

如果GroupA和GroupB都正常啟動,那麼GroupB內4各消費平均消費生產者的訊息資料(這裡每個25個訊息),GroupA內2個消費者各處理50各訊息,每個消費者處理2各分割槽。如果GroupA內一個消費者結束通話,那麼另一個處理所有訊息資料。如果GroupB掛掉一個,那麼將有一個消費者出來處理掛掉沒有處理的訊息資料。 
  以下命令可以修改某主題的分割槽大小。

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic foo --partitions 4
  • 1

三、multi-broker cluster

這裡其實和Zookeeper機制由點類似,也是建立了一個leader和幾個follower。主要的作用還是為了可擴充套件性和容錯性。當集中任意一臺出問題,都可以保證系統的正確和穩定。即使是leader出現問題,它們也可以通過投票的方式產生新leader. 這裡只是簡單說明一下。

在它的官方例子中通過複製原有的配置檔案,在本地建立了偽叢集服務。

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1

config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

其中 broker.id 屬性是叢集中唯一的和永久的節點名字,正常應該是一臺機子一個服務。其它兩個是因為偽叢集的原因必須修改。 
讓後啟動這兩臺服務建立偽叢集。模擬了leader失效(被強行kill)後,它還可以正常工作。 
啟動:

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &

四、典型應用場景

  1. 監控:主機通過Kafka傳送與系統和應用程式健康相關的指標,然後這些資訊會被收集和處理從而建立監控儀表盤併發送警告。除此之外,LinkedIn還利用Apache Samza實現了一個能夠實時處理事件的富呼叫圖分析系統。
  2. 傳統的訊息: 應用程度使用Kafka作為傳統的訊息系統實現標準的佇列和訊息的釋出—訂閱,例如搜尋和內容提要(Content Feed)。
  3. 分析: 為了更好地理解使用者行為,改善使用者體驗,LinkedIn會將使用者查看了哪個頁面、點選了哪些內容等資訊傳送到每個資料中心的Kafka叢集上,並通過Hadoop進行分析、生成日常報告。
  4. 作為分散式應用程式或平臺的構件(日誌):大資料倉庫解決方案Pinot等產品將Kafka作為核心構件(分散式日誌),分散式資料庫Espresso將其作為內部副本並改變傳播層。

相關推薦

初識 Kafka+JAVA程式例項

本文是從英文的官網摘了翻譯的,用作自己的整理和記錄。水平有限,歡迎指正。版本是: kafka_2.10-0.10.0.0   一、基礎概念主題:Kafka maintains feeds of messages in categories called topics.   生

初識Apache Kafka+JAVA程式例項

  本文是從英文的官網摘了翻譯的,用作自己的整理和記錄。水平有限,歡迎指正。版本是: kafka_2.10-0.10.0.0    一、基礎概念 主題:Kafka maintains feeds of messages in categories c

KafKa Java程式設計例項

KafKa Java程式設計例項 編寫一個能傳送訊息,接收訊息的例項 (1)編寫係數配置:KafkaProperties.java p

Java中利用集合框架模擬鬥地主程式例項

package doudizhuDemo; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.TreeSet; /* 模擬鬥地主 *

Java程式多程序執行模式的例項分析

一般我們在java中執行其它類中的方法時,無論是靜態呼叫,還是動態呼叫,都是在當前的程序中執行的,也就是說,只有一個java虛擬機器例項在執行。而有的時候,我們需要通過java程式碼啟動多個java子程序。這樣做雖然佔用了一些系統資源,但會使程式更加穩定,因為新啟動的程式是在不同的虛擬機器程序中執行

編寫Java程式Kafka叢集生產並消費資料

一.Kafka生產資料 1.預備知識: 1.程式設計環境如下: 01.使用windows的intellij編寫java程式,連線到本地虛擬機器上的kafka叢集,生產和消費資料。 02.一定要注意配置等問題,否則會導致無法連線到zookeeper和

ABAP--一個極好的呼叫外部java程式的Search Help Exit的例項(RFC好例子)

Connecting to an external source via Java Connector (By Bob Billings) In the process of SAP Order Entry (VA01) it became necess

JNI中在被呼叫的C/C++函式中如何訪問Java程式中的類,並編寫應用例項

1、在javah工具生成的C/C++函式宣告中的JNIEnv型別引數和jobject型別引數的介紹 JNIEXPORT void JNICALL Java_jni_TestNative_sayhell

Java經典演算法四十例程式設計詳解+程式例項

JAVA經典演算法40例 【程式1】   題目:古典問題:有一對兔子,從出生後第3個月起每個月都生一對兔子,小兔子長到第四個月後每個月又生一對兔子,假如兔子都不死,問每個月的兔子總數為多少?   1.程式分析:   兔子的規律為數列1,1,2,3,4,6,9,13,....

Java 程式連線 Informix 資料庫方法例項介紹

開您的試用 概述 Informix 是一種應用廣泛的關係型資料庫伺服器,支援多種型別的客戶端連線程式,包括 .Net、Java、PHP 等。對於 Java 程式,Informix 支援兩種 JDBC 供客戶端連線,分別是 IBM Informix JDBC 和 IBM Data Server dr

Apache POI:Java程式讀寫Microsoft Office格式文件——簡單完整例項講解

1.Apache POI簡介             Apache POI是Apache軟體基金會的開放原始碼函式庫,官方名稱為:Apache POI - the Java API for Microsoft Documents,POI提供API給Java程式對Micros

初識Kafka

第一個 物理 生產 應用場景 完全 mes 不同的 規則 cnblogs Kafka是什麽?   是一個分布式消息系統  類JMS消息隊列,結合JMS中的兩種模式,可以有多個消費者主動拉取數據,在JMS中只有點對點模式才有消費者主動拉取數據。  kafka是一個生產-消費

Kafka筆記整理(二):Kafka Java API使用

大數據 Kafka Java [TOC] Kafka筆記整理(二):Kafka Java API使用 下面的測試代碼使用的都是下面的topic: $ kafka-topics.sh --describe hadoop --zookeeper uplooking01:2181,uplooking0

JAVA初識JAVA是什麽?

正常 存儲 ise 安全性 編譯程序 ssl與tls 決定 工具集 發展史 一、什麽是JAVA Java是一門面向對象編程語言,不僅吸收了C++語言的各種優點,還摒棄了C++裏難以理解的多繼承、指針等概念,因此Java語言具有功能強大和簡單易用兩個特征。 Java語言作為

JAVA程式設計師就業方向有哪幾個方面?

不難發現,在網際網路+的影響下,這幾年,中國的網際網路行業進入了高速發展的階段,同時IT行業,也成為了熱門,備受追捧和關注的行業。在全球雲端計算和移動網際網路的產業環境下,JAVA程式設計師就業方向有哪幾個方面?

為什麼大多公司不要培訓班培訓出來的JAVA程式設計師?

我先簡單介紹一下自己,我的履歷應該能讓你想看下去。 我16年年底培訓結束,靠簡歷造假第一年拿了13k,第二年跳槽拿了20k。 我畢業於一個985,文科。16年的時候發現真的太不喜歡文科了,我謀求理科方面的出路,想到了學程式設計,諮詢了某培訓機構,銷售建議我學java。 我自己去買了個ja

2018年Java程式設計師的現狀,風光背後的危機!

2018年,Java程式設計師面臨更加激烈的競爭。 不得不承認,經歷過行業的飛速發展期,網際網路的整體發展趨於平穩。為什麼這麼說? 對於進可攻前端,後可守後端大本營的 Java 程式設計師而言,雖然供應逐年上漲,但是市場似乎對他們依然青睞有加。這些承擔著技術招聘市場中高供給高需求的 Java

2018年 Java程式設計師學習大資料最佳之路!

隨著大資料時代的到來,有很多Java程式設計師想要轉行大資料。 不得不說,大資料行業可以說是為Java程式設計師量身打造的一個朝陽行業?為什麼要這麼說呢? 因為Java工程師轉型大資料具有天然進階優勢,不僅僅是前景和薪資等。技術層面來說,大資料使用的Hadoop(在分散式伺服

對於Java程式猿學習的建議

第一部分:對於尚未做過Java工作的同學,包括一些在校生以及剛準備轉行Java的同學。 一、Java基礎 首先去找一個Java的基礎教程學一下,這裡可以推薦一個地址,或者你也可以參照這個地址上去找相應的視訊,地址為http://www.runoob.com/java/java-tut

JDBC(資料庫的驅動、連線、java程式操作資料庫、事務、隔離級別、連線池等)

java操作資料庫的思想:連上資料庫,傳送sql語句。在連上資料庫之前,要先用程式啟動資料庫,因此,可以通過反射載入類驅動(com.jdbc.mysql.Driver)。通過驅動管理類的靜態方法傳遞資料庫的url來獲取一個連線物件(connection)。有三個過載的方法,第一個user和p