流式計算--kafka1(kafka叢集搭建)
1、Kafka是什麼
在流式計算中,Kafka一般用來快取資料,Storm通過消費Kafka的資料進行計算。KAFKA + STORM +REDIS
- Kafka是一個分散式訊息佇列:生產者、消費者的功能。它提供了類似於JMS的特性,但是在設計實現上完全不同,此外它並不是JMS規範的實現。Kafka由Scala寫成。
- Kafka對訊息儲存時根據Topic進行歸類,傳送訊息者稱為Producer,訊息接受者稱為Consumer,此外kafka叢集有多個kafka例項組成,每個例項(server)成為broker。
- 無論是kafka叢集,還是producer和consumer都依賴於zookeeper
2、JMS是什麼
JMS是Java提供的一套技術規範,生產消費者模式(生產者、伺服器、消費者)
用來異構系統 整合通訊,緩解系統瓶頸,提高系統的伸縮性增強系統使用者體驗,使得系統模組化和元件化變得可行並更加靈活
JMS訊息傳輸模型:
- 點對點模式(一對一,消費者主動拉取資料,訊息收到後訊息清除)點對點模型通常是一個基於拉取或者輪詢的訊息傳送模型,這種模型從佇列中請求資訊,而不是將訊息推送到客戶端。這個模型的特點是傳送到佇列的訊息被一個且只有一個接收者接收處理,即使有多個訊息監聽者也是如此。
- 釋出/訂閱模式(一對多,資料生產後,推送
JMS核心元件:
- Destination:訊息傳送的目的地,也就是前面說的Queue和Topic。
- Message :從字面上就可以看出是被髮送的訊息。
-
Producer: 訊息的生產者,要傳送一個訊息,必須通過這個生產者來發送。
- MessageConsumer: 與生產者相對應,這是訊息的消費者或接收者,通過它來接收一個訊息。
3.常見的類JMS訊息伺服器
JMS訊息伺服器ActiveMQ(他支援事務,rocketMq也支援)主要特點:
- 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP(高階訊息佇列協議)
- 完全支援JMS1.1和J2EE 1.4規範 (持久化,XA訊息,事務)
- 對Spring的支援,ActiveMQ可以很容易內嵌到使用Spring的系統裡面去,而且也支援Spring2.0的特性
- 通過了常見J2EE伺服器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何相容J2EE 1.4 商業伺服器上
- 支援多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
- 支援通過JDBC和journal提供高速的訊息持久化
- 從設計上保證了高效能的叢集,客戶端-伺服器,點對點
- 支援Ajax
- 支援與Axis的整合
- 可以很容易得呼叫內嵌JMS provider,進行測試
分散式訊息中介軟體 RocketMQ(分散式,億級訊息堆積能力):
- 能夠保證嚴格的訊息順序
- 提供豐富的訊息拉取模式
- 高效的訂閱者水平擴充套件能力
- 實時的訊息訂閱機制
- 億級訊息堆積能力
- Metaq3.0 版本改名,產品名稱改為RocketMQ
kafka的主要的特點是為了高吞吐量,支援動態擴容
為什麼需要訊息佇列:解耦,非同步和並行
4.Kafka核心元件
- Topic :訊息根據Topic進行歸類
- Producer:傳送訊息者
- Consumer:訊息接受者
- broker:每個kafka例項(server)
- Zookeeper:依賴叢集儲存meta資訊。
5.kafka叢集搭建
首先本篇部落格基於Storm叢集上搭建kafka叢集,所以機器的配置和Storm的一樣
所以首先起來zk:
1.下載安裝包
在linux中使用wget命令下載安裝包
Sudo yum -y install wget
wget http://mirror.bit.edu.cn/apache/kafka/0.10.2.1/kafka_2.12-0.10.2.1.tgz
2.解壓安裝包
tar -zxvf /export/software/kafka_2.11-0.8.2.2.tgz -C /export/servers/
cd /export/servers/
3. 配置環境變數:
export KAFKA_HOME= /export/servers/kafka/kafka_2.12-0.10.2.1
export PATH=$KAFKA_HOME/bin:$PATH
4.修改kafka配置檔案:
#每臺伺服器的broker.id都不能相同
broker.id=1
host.name=192.168. 25.130
advertised.host.name=192.168.25.130
advertised.port=9092
delete.topic.enable=true
#用來監聽連結的埠,生產者和消費者將在此埠建立連結
port=9092
#處理網路請求的執行緒數量
num.network.threads=3
#處理磁碟io的執行緒數量
num.io.threads=8
#傳送套接字的緩衝區大小
socket.send.buffer.bytes=102400
#接收套接字緩衝區的大小
socket.receive.buffer.bytes=102400
#請求套接字的額緩衝區的大小
socket.request.max.bytes=104857600
#kafka執行日誌的存放的路徑
log.dirs=/export/servers/logs/kafka
#topic在當前broker上的數量
num.partitions=1
#用來恢復和清理data下資料的執行緒數量
num.recovery.threads.per.data.dir=1
#segment檔案保留的最長時間,超時將被刪除
log.retention.hours=168
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
zookeeper.connect=192.168.25.130:2181,192.168.25.131:2181,192.168.25.132:2181
分發到其他機器上,修改id,修改host.name,修改環境變數
sudo scp -r kafka/ [email protected]:$pwd
5.依次啟動kafka:
bin/kafka-server-start.sh config/server.properties
啟動報錯修稿資料夾許可權:
sudo chown -R storm:storm export/
6.kafka常用命令:
- 檢視當前伺服器中的所有topic:kafka-topics.sh --list --zookeeper zk01:2181
2建立topic:kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 2 --partitions 4 --topic orderMq
3.通過shell來來生產kafka的訊息:kafka-console-producer.sh --broker-list storm01:9092 --topic orderMq
4.通過shell來消費kafka的訊息:kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic orderMq
分佈:
終極圖片(Mr.Mao的圖):