1. 程式人生 > >zookeeper+kafka集群安裝之二

zookeeper+kafka集群安裝之二

聲明 r+ object ise width cli top 直接 partition

版權聲明:本文為博主原創文章,未經博主同意不得轉載。 https://blog.csdn.net/cheungmine/article/details/26683941

zookeeper+kafka集群安裝之二

此為上一篇文章的續篇, kafka安裝須要依賴zookeeper, 本文與上一篇文章都是真正分布式安裝配置, 能夠直接用於生產環境.

zookeeper安裝參考:

http://blog.csdn.net/ubuntu64fan/article/details/26678877

首先了解幾個kafka中的概念:

  1. kafka是一個消息隊列服務器,服務稱為broker, 消息發送者稱為producer, 消息接收者稱為consumer;
  2. 通常我們部署多個broker以提供高可用性的消息服務集群.典型的是3個broker;
  3. 消息以topic的形式發送到broker,消費者訂閱topic,實現按需取用的消費模式;
  4. 創建topic須要指定replication-factor(復制數目, 通常=broker數目);
  5. 每一個topic可能有多個分區(partition), 每一個分區的消息內容不會反復:

假定我們有一個名稱為test的topic, 分區數目為2, 當我們發送到這個test詳細的消息"msg1:hello beijing"和"msg2:hello shanghai"的時候,我們怎樣知道消息的發送路徑呢(發往哪個分區)?

msg1如果被發送到分區test.1,則肯定不會發送到test.2. 數據發送路徑選擇決策受kafka.producer.Partitioner的影響:

interface Partitioner {
    int partition(java.lang.Object key, int numPartitions);
}

一個偽代碼的實現例如以下:

package org.mymibao.mq.client;

import kafka.producer.Partitioner;

public class DefaultKafkaPartitioner implements Partitioner {
    private final static int FIRST_PARTITION_ID = 1;

    public int partition(Object key, int numPartitions) {
        return FIRST_PARTITION_ID;
    }
}

分區API依據相關的鍵值以及系統中具有的代理分區的數量返回一個分區id。

將該id用作索引,在broker_id和partition組成的經過排序的列表中為對應的生產者請求找出一個代理分區。缺省的分區策略是hash(key)%numPartitions。如果key為null,那就進行隨機選擇。使用partitioner.class這個配置參數可用插入自己定義的分區策略.分區文件不會跨越broker,可是多個broker上能夠有某個topic的分區副本.

kafka安裝配置參考:

1)下載KAFKA

??? $ wget http://apache.fayea.com/apache-mirror/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz

安裝和配置參考上一篇文章:

http://blog.csdn.net/ubuntu64fan/article/details/26678877

2)配置$KAFKA_HOME/config/server.properties

我們安裝3個broker。分別在3個vm上:zk1。zk2,zk3:

zk1:

$ vi /etc/sysconfig/network

NETWORKING=yes
HOSTNAME=zk1

$ vi $KAFKA_HOME/config/server.properties

broker.id=0
port=9092
host.name=zk1
advertised.host.name=zk1
...
num.partitions=2
...
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181

zk2:

$ vi /etc/sysconfig/network

NETWORKING=yes
HOSTNAME=zk2

$ vi $KAFKA_HOME/config/server.properties

broker.id=1
port=9092
host.name=zk2
advertised.host.name=zk2
...
num.partitions=2
...
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181

zk3:

$ vi /etc/sysconfig/network

NETWORKING=yes
HOSTNAME=zk3

$ vi $KAFKA_HOME/config/server.properties

broker.id=2
port=9092
host.name=zk3
advertised.host.name=zk3
...
num.partitions=2
...
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181

3)啟動zookeeper服務, 在zk1,zk2,zk3上分別執行:

$ zkServer.sh start

4)啟動kafka服務, 在zk1,zk2,zk3上分別執行:

$ kafka-server-start.sh $KAFKA_HOME/config/server.properties

5) 新建一個TOPIC(replication-factor=num of brokers)

$ kafka-topics.sh --create --topic test --replication-factor 3 --partitions 2 --zookeeper zk1:2181

列出已有的topics:

$ kafka-topics.sh --list --zookeeper zk1:2181

6)如果我們在zk2上,開一個終端發送消息至kafka(zk2模擬producer)

$ kafka-console-producer.sh --broker-list zk1:9092 --sync --topic test

在發送消息的終端輸入:Hello Kafka

7)如果我們在zk3上,開一個終端,顯示消息的消費(zk3模擬consumer)

$ kafka-console-consumer.sh --zookeeper zk1:2181 --topic test --from-beginning
在消費消息的終端顯示Hello Kafka

8) 編程操作Producer和Consumer的樣例參考:

http://shift-alt-ctrl.iteye.com/blog/1930791




zookeeper+kafka集群安裝之二