分布式消息隊列Kafka集群安裝
kafka是LinkedIn開發並開源的一個分布式MQ系統,現在是Apache的一個孵化項目。在它的主頁描述kafka為一個高吞吐量的分布式(能將消息分散到不同的節點上)MQ。在這片博文中,作者簡單提到了開發kafka而不選擇已有MQ系統的原因。兩個原因:性能和擴展性。Kafka僅僅由7000行Scala編寫,據了解,Kafka每秒可以生產約25萬消息(50 MB),每秒處理55萬消息(110 MB)。
Kafka版本:0.8.0
約定:安裝3臺虛擬機
官網:http://kafka.apache.org/
官方文檔:http://kafka.apache.org/documentation.html#quickstart
下載解壓
# wget http://mirrors.hust.edu.cn/apache/kafka/0.8.0/kafka-0.8.0-src.tgz
# tar xzf kafka-0.8.0-src.tgz
# cd kafka-0.8.0-src
Kafka是用Scala寫的,SBT是Simple Build Tool的簡稱,如果讀者使用過Maven,那麽可以簡單將SBT看做是Scala世界的Maven,雖然二者各有優劣,但完成的工作基本是類似的。
## Building it ##
# ./sbt update
# ./sbt package
# ./sbt assembly-package-dependency
以上每一步完成就會提醒[Success]
例如:[success] Total time: 21 s, completed 2014-2-11 10:29:55
集群環境需要修改配置文件
# vim config/server.properties
brokerid:這個每個server(broker)必須唯一,寫數字
hostname:這個也是唯一的,寫服務器IP即可
############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=3 ############################# Socket Server Settings ############################# # The port the socket server listens on port=9092 # Hostname the broker will bind to and advertise to producers and consumers. # If not set, the server will bind to all interfaces and advertise the value returned from # from java.net.InetAddress.getCanonicalHostName(). #host.name=localhost host.name=192.168.2.111
還有就是zookeeper.connect也要配置
zookeeper.connect=192.168.19.218:2181,192.168.19.217:2181,192.168.19.214:2181
關於zookeeper的安裝可以參考此文:ZooKeeper集群環境安裝與配置其他默認配置即可。
啟動Kafka服務
# /usr/kafka-0.8.0-src/bin/kafka-server-start.sh /usr/kafka-0.8.0-src/config/server.properties
創建Topic
# /usr/kafka-0.8.0-src/bin/kafka-create-topic.sh --zookeeper localhost:2181 --partition 1 --topic test
查看Topic
# /usr/kafka-0.8.0-src/bin/kafka-list-topic.sh --zookeeper localhost:2181
輸出:
topic: test partition: 0 leader: 1 replicas: 1 isr: 1
說明:
partiton: partion id,由於此處只有一個partition,因此partition id 為0
leader:當前負責讀寫的lead broker id
relicas:當前partition的所有replication broker list
isr:relicas的子集,只包含出於活動狀態的broker
producer發送消息
# /usr/kafka-0.8.0-src/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
consumer接收消息
# /usr/kafka-0.8.0-src/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
註意,如果上述命令不能發送接收消息說明沒有配置host,可以直接用ip
producer發送消息
# /usr/kafka-0.8.0-src/bin/kafka-console-producer.sh --broker-list 192.168.19.218:9092 --topic test
consumer接收消息
# /usr/kafka-0.8.0-src/bin/kafka-console-consumer.sh --zookeeper 192.168.19.218:2181 --topic test --from-beginning
如果要最新的數據,可以不帶--from-beginning參數即可。
# /usr/kafka-0.8.0-src/bin/kafka-console-consumer.sh --zookeeper 192.168.19.218:2181 --topic test
在kafka的核心思路中,不需要在內存裏緩存數據,因為操作系統的文件緩存已經足夠完善和強大,只要不做隨機寫,順序讀寫的性能是非常高效的。kafka的數據只會順序append,數據的刪除策略是累積到一定程度或者超過一定時間再刪除。Kafka另一個獨特的地方是將消費者信息保存在客戶端而不是MQ服務器,這樣服務器就不用記錄消息的投遞過程,每個客戶端都自己知道自己下一次應該從什麽地方什麽位置讀取消息,消息的投遞過程也是采用客戶端主動pull的模型,這樣大大減輕了服務器的負擔。Kafka還強調減少數據的序列化和拷貝開銷,它會將一些消息組織成Message Set做批量存儲和發送,並且客戶端在pull數據的時候,盡量以zero-copy的方式傳輸,利用sendfile(對應java裏的FileChannel.transferTo/transferFrom)這樣的高級IO函數來減少拷貝開銷。可見,kafka是一個精心設計,特定於某些應用的MQ系統,這種偏向特定領域的MQ系統我估計會越來越多,垂直化的產品策略值的考慮。
只要磁盤沒有限制並且不出現損失,kafka可以存儲相當長時間的消息(一周)。
原文出自:本人另一個博客http://blog.csdn.net/unix21/
再分享一下我老師大神的人工智能教程吧。零基礎!通俗易懂!風趣幽默!還帶黃段子!希望你也加入到我們人工智能的隊伍中來!http://www.captainbed.net
分布式消息隊列Kafka集群安裝