1. 程式人生 > >Kafka叢集部署與shell命令列操作

Kafka叢集部署與shell命令列操作

1、kafka簡介
在流式計算中,Kafka一般用來快取資料,Storm通過消費Kafka的資料進行計算。

KAFKA + STORM +REDIS

1、Apache Kafka是一個開源訊息系統,由Scala寫成。是由Apache軟體基金會開發的一個開源訊息系統專案。
2、Kafka最初是由LinkedIn開發,並於2011年初開源。2012年10月從Apache Incubator畢業。該專案的目標是為處理實時資料提供一個統一、高通量、低等待的平臺。
3、Kafka是一個分散式訊息佇列:生產者、消費者的功能。它提供了類似於JMS的特性,但是在設計實現上完全不同,此外它並不是JMS規範的實現。
4、Kafka對訊息儲存時根據Topic進行歸類,傳送訊息者稱為Producer,訊息接受者稱為Consumer,此外kafka叢集有多個kafka例項組成,每個例項(server)稱為broker。
5、無論是kafka叢集,還是producer和consumer都依賴於zookeeper叢集儲存一些meta資訊,來保證系統可用性
注:類JMS訊息佇列,結合JMS中的兩種模式,可以有多個消費者主動拉取資料,在JMS中只有點對點模式才有消費者主動拉取資料。
2、kafka叢集部署


kafka叢集跟storm叢集類似,分為:下載安裝包、解壓安裝包、修改配置檔案、分發安裝包(之後需要在各個機器上再次修改配置檔案)、啟動叢集。

準備工作
這裡使用三臺機器進行叢集:mini1,mini2,mini3
關閉防火牆,各機器的hosts檔案如下

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6 localhost.jinbm
192.168.25.127 mini1 192.168.25.129 mini2 192.168.25.130 mini3

1、下載安裝包並上傳到mini1機器上,這裡下載的是 kafka_2.11-0.8.2.0.tgz;
2、解壓安裝包導指定目錄,我這臺機器的apps目錄下安裝了很多大資料相關的軟體,所以解壓到該目錄了,並且重新命名。

[root@mini1 ~]# tar -zxvf kafka_2.11-0.8.2.0.tgz -C apps/
[root@mini1 ~]# cd apps/
[root@mini1 apps]# ll
總用量 28
drwxr-xr-x.  8 root   root   4096
1019 15:15 apache-flume-1.6.0-bin drwxrwxr-x. 10 hadoop hadoop 4096 930 22:04 hadoop-2.6.4 drwxr-xr-x. 7 root root 4096 1030 00:20 hbase drwxr-xr-x. 8 root root 4096 1017 12:38 hive drwxr-xr-x. 5 root root 4096 129 2015 kafka_2.11-0.8.2.0 drwxr-xr-x. 10 root root 4096 1030 15:34 storm drwxr-xr-x. 10 root root 4096 1029 23:21 zookeeper-3.4.6 [root@mini1 apps]# mv kafka_2.11-0.8.2.0/ kafka [root@mini1 apps]# ll 總用量 28 drwxr-xr-x. 8 root root 4096 1019 15:15 apache-flume-1.6.0-bin drwxrwxr-x. 10 hadoop hadoop 4096 930 22:04 hadoop-2.6.4 drwxr-xr-x. 7 root root 4096 1030 00:20 hbase drwxr-xr-x. 8 root root 4096 1017 12:38 hive drwxr-xr-x. 5 root root 4096 129 2015 kafka drwxr-xr-x. 10 root root 4096 1030 15:34 storm drwxr-xr-x. 10 root root 4096 1029 23:21 zookeeper-3.4.6

3、修改配置檔案,進入kafka的config目錄下,可以檢視到各類配置檔案,這裡修改server.properties 。

[[email protected] config]# ll
總用量 32
-rw-r--r--. 1 root root 1199 129 2015 consumer.properties
-rw-r--r--. 1 root root 3846 129 2015 log4j.properties
-rw-r--r--. 1 root root 2228 129 2015 producer.properties
-rw-r--r--. 1 root root 5559 129 2015 server.properties
-rw-r--r--. 1 root root 3325 129 2015 test-log4j.properties
-rw-r--r--. 1 root root  993 129 2015 tools-log4j.properties
-rw-r--r--. 1 root root 1023 129 2015 zookeeper.properties
[[email protected] config]# vi server.properties 
#broker的全域性唯一編號,不能重複
broker.id=0

#用來監聽連結的埠,producer或consumer將在此埠建立連線
port=9092

#處理網路請求的執行緒數量
num.network.threads=3

#用來處理磁碟IO的執行緒數量
num.io.threads=8

#傳送套接字的緩衝區大小
socket.send.buffer.bytes=102400

#接受套接字的緩衝區大小
socket.receive.buffer.bytes=102400

#請求套接字的緩衝區大小
socket.request.max.bytes=104857600

#kafka執行日誌存放的路徑
log.dirs=/root/apps/kafka

#topic在當前broker上的分片個數
num.partitions=2

#用來恢復和清理data下資料的執行緒數量
num.recovery.threads.per.data.dir=1

#segment檔案保留的最長時間,超時將被刪除
log.retention.hours=168

#滾動生成新的segment檔案的最大時間
log.roll.hours=168

#日誌檔案中每個segment的大小,預設為1G
log.segment.bytes=1073741824

#週期性檢查檔案大小的時間
log.retention.check.interval.ms=300000

#日誌清理是否開啟
log.cleaner.enable=true

#broker需要使用zookeeper儲存meta資料
zookeeper.connect=mini1:2181,mini2:2181,mini3:2181

#zookeeper連結超時時間
zookeeper.connection.timeout.ms=6000

#partion buffer中,訊息的條數達到閾值,將觸發flush到磁碟
log.flush.interval.messages=10000

#訊息buffer的時間,達到閾值,將觸發flush到磁碟
log.flush.interval.ms=3000

#刪除topic需要server.properties中設定delete.topic.enable=true否則只是標記刪除
delete.topic.enable=true

#此處的host.name為本機IP(重要),如果不改,則客戶端會丟擲:Producer connection to localhost:9092 unsuccessful 錯誤!
host.name=mini1

4、將mini1上的kafka分發到mini2和mini3上

[root@mini1 ~]# scp -r apps/kafka/ mini2:/root/apps/
[root@mini1 ~]# scp -r apps/kafka/ mini3:/root/apps/

5、再次修改mini2和mini3上的配置檔案server.properties,修改兩處,一個是broker編號一個是主機名
mini2上如下

#broker的全域性唯一編號,不能重複
broker.id=1
#此處的host.name為本機IP(重要),如果不改,則客戶端會丟擲:Producer connection to localhost:9092 unsuccessful 錯誤!
host.name=mini2

mini3上如下

#broker的全域性唯一編號,不能重複
broker.id=2
#此處的host.name為本機IP(重要),如果不改,則客戶端會丟擲:Producer connection to localhost:9092 unsuccessful 錯誤!
host.name=mini3

6、到這裡配置完了,但是為了不進入kafka的bin目錄下就能執行命令列操作,所以需要修改環境變數。三臺機器上修改均如下(新增最後兩行)

[[email protected] ~]# vi /etc/profile
...
JAVA_HOME=/heima32/jdk1.7.0_55/
HADOOP_HOME=/root/apps/hadoop-2.6.4
export PATH=$JAVA_HOME/bin:$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HBASE_HOME=/root/apps/hbase
export PATH=$PATH:$HBASE_HOME/bin
export STORM_HOME=/root/apps/storm
export PATH=$PATH:$STORM_HOME/bin
export KAFKA_HOME=/root/apps/kafka
export PATH=$PATH:$KAFKA_HOME/bin

7、啟動叢集
依次啟動mini1,mini2和mini3上的kafka(進入kafka安裝目錄執行以下命令)

[root@mini1 kafka]# bin/kafka-server-start.sh config/server.properties

3、kafka命令列操作
1、建立topic(注:因為修改了環境變數,所以不管有沒有進入bin目錄都能執行相應命令列操作)
引數表示:指定副本數,使用幾個partition(分片),topic名稱

[[email protected] bin]# ./kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 2 --partitions 3 --topic orderMq
Created topic "orderMq".
[[email protected] kafka]# kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 2 --partitions 3 --topic test
Created topic "test".

2、檢視當前存在哪些topic

[[email protected] kafka]# kafka-topics.sh --list --zookeeper mini1:2181
orderMq
test

3、刪除指定topic

[root@mini1 kafka]# kafka-topics.sh --delete --zookeeper mini1:2181 --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[root@mini1 kafka]# kafka-topics.sh --list --zookeeper mini1:2181
orderMq

4、通過shell命令列傳送message(訊息)

[[email protected] bin]# kafka-console-producer.sh --broker-list mini1:9092 --topic orderMq
hello tom
hi jerry
spring ^H
hhaah
xixi
nini

5、通過shell命令列消費訊息

[[email protected] bin]# kafka-console-consumer.sh --zookeeper mini1:2181 -from-beginning --topic orderMq
hello tom
hi jerry
spring 
hhaah
xixi
nini

注:這裡的生成和消費是同順序的,但有時候是不會順序一致的,只能保證消費一個partition裡面的訊息時是一致,消費多個partition(上面指定了訊息被髮送到了4個partition)裡面的是不能保證順序完全一致。

6、檢視某個topic的詳情
orderMq這個topic有用了三個partition(自己建立的時候指定的),各自有2個副本(也是自己建立的時候指定的)以及對於的leader(用處後面講)。

[[email protected] bin]# ./kafka-topics.sh --topic orderMq --describe --zookeeper mini1:2181
Topic:orderMq   PartitionCount:3        ReplicationFactor:2     Configs:
        Topic: orderMq  Partition: 0    Leader: 2       Replicas: 2,0   Isr: 0,2
        Topic: orderMq  Partition: 1    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: orderMq  Partition: 2    Leader: 1       Replicas: 1,2   Isr: 1,2

接下來再建立個topic並且檢視詳情(使用4個partition,兩個副本),並且進入執行日誌存放的路徑檢視

[[email protected] bin]# kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 2 --partitions 4 --topic payment
Created topic "payment".
[[email protected] bin]# kafka-topics.sh --topic payment --describe --zookeeper mini1:2181
Topic:payment   PartitionCount:4        ReplicationFactor:2     Configs:
        Topic: payment  Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: payment  Partition: 1    Leader: 0       Replicas: 0,2   Isr: 0,2
        Topic: payment  Partition: 2    Leader: 1       Replicas: 1,0   Isr: 1,0
        Topic: payment  Partition: 3    Leader: 2       Replicas: 2,0   Isr: 2,0
[root@mini1 kafka]# cd kafka-logs/
[root@mini1 kafka-logs]# ll
總用量 32
-rw-r--r--. 1 root root    4 1121 22:34 cleaner-offset-checkpoint
drwxr-xr-x. 2 root root 4096 1121 22:25 orderMq-0
drwxr-xr-x. 2 root root 4096 1121 22:25 orderMq-1
drwxr-xr-x. 2 root root 4096 1121 23:13 payment-1
drwxr-xr-x. 2 root root 4096 1121 23:13 payment-2
drwxr-xr-x. 2 root root 4096 1121 23:13 payment-3
-rw-r--r--. 1 root root   64 1122 03:38 recovery-point-offset-checkpoint
-rw-r--r--. 1 root root   64 1122 03:39 replication-offset-checkpoint
[root@mini1 kafka-logs]# cd orderMq-1
[root@mini1 orderMq-1]# ll
總用量 4
-rw-r--r--. 1 root root 10485760 1121 22:31 00000000000000000000.index
-rw-r--r--. 1 root root      194 1121 22:39 00000000000000000000.log
#00000000000000000000.log裡面是傳送的訊息內容,亂碼就沒貼出來了。

相關推薦

Kafka叢集部署shell命令操作

1、kafka簡介 在流式計算中,Kafka一般用來快取資料,Storm通過消費Kafka的資料進行計算。 KAFKA + STORM +REDIS 1、Apache Kafka是一個開源訊息系統,由Scala寫成。是由Apache軟體基金會開發的一個開源

kafka shell 命令操作

kafka的命令列操作     1、產看當前叢集中已存在的主題topic         bin/kafka-topics.sh --list --zookeeper bigdata:2181  &n

MongoDB之——基於Shell命令操作記錄

一、插入記錄下面我們來建立一個 test 的集合並寫入一些資料. 建立兩個物件 j 和 t , 並儲存到集合中去.在例子裡 “>” 來表示是 shell 輸入提示符> j = { name : "mongo" }; {"name" : "mongo"} >

Shell命令操作

shell提示符 [[email protected] ~]$ 如果最後一個字元是"#",表示當前終端會話有超級使用者許可權。使用root使用者登入或者使用能提供超級使用者許可權的終端能獲得該許可權。 提示符由名為PS1(prompt strin

kafka叢集部署配置手冊

本文中包含了一套kafka叢集的部署、配置、除錯和壓測的技術方法。 在三個主機節點上進行部署。 server1:192.168.10.1 server2:192.168.10.2 server3:192.168.10.3 1、jdk7u80的安裝與配置 rpm -iv

Spark單機叢集安裝簡單命令使用

參考自書籍《Hadoop+Spark 大資料巨量分析與機器學習》 環境依賴: jdk 1.7 scala 2.11.6 spark 2.1.2 1 安裝scala $ wget https://www.scala-lang.org/files/archiv

Kafka(二): Kafka 叢集部署使用

一、Kafka 叢集部署                                                               Kafka是一種分散式的釋出(producer)/訂

Kafka叢集部署命令操作

1、叢集規劃 hadoop102                                 hadoop103                          hadoop104 zk                                      

shell 從檔案中讀取批量檔名並做命令操作

222檔案內容: /home/zhangsuosheng/Desktop/9-30/9_30/1bak/1538291162.png /home/zhangsuosheng/Desktop/9-30/9_30/1bak/1538291212.png /home/zhangsuosheng/Deskto

HDFS基本命令操作簡單API使用

1.開啟叢集命令  start-dfs.sh 2.檢視幫助 hdfs dfs -help 3.檢視當前目錄資訊 hdfs dfs -ls / 4.從本地上傳檔案  hdfs dfs -put data.txt  /wc/in 5.從hdfs下載

大資料之hbase(一) --- HBase介紹,特性,安裝部署shell命令,client端hbase的互動過程,程式設計API訪問hbase實現百萬寫入

一、HBase介紹 ---------------------------------------------- 1.基於hadoop的資料庫,具有分散式,可伸縮的大型資料儲存 2.用於對資料的隨機訪問,實時讀寫 3.巨大的表,十億行*百萬列

storm概述、叢集安裝和簡單的命令操作

http://storm.apache.org Apache Storm是一個免費的開源分散式實時計算系統。Storm可以輕鬆可靠地處理無限資料流,實現Hadoop對批處理所做的實時處理。Storm非常簡單,可以與任何程式語言一起使用,並且使用起 來很有趣! Storm有許多用例:實時分析,

linux的命令操作shell的區別

      這個問題也是突然想起來的,因為我身邊的同事有時候會把這兩個區別的表述,所以我就好奇了,到底是什麼關係呢,他們都能操作Linux系統啊。。後來百度一下才知道大致的區別。 一、什麼是shell 答:       shell翻譯成殼的意思,它是包

kafka命令操作

1)檢視當前叢集中已存在的主題topic bin/kafka-topics.sh --zookeeper master:2181 --list 2)建立topic bin/kafka-topics.sh --zookeeper master:2181 --create -

maven專案的建立配置、maven命令操作、引數說明、依賴管理

一、建maven專案(eclipse ee版本) 選這兩項就可以建maven專案了 二、設定maven 一是修改setting.xml(maven的conf目錄) 修改本地倉庫地址:<localRepository>d:/.m2/repositor

有關HDFS程式設計:HDFS Shell命令常見操作

在/usr/local/hadoop/etc/hadoop 目錄下:幫助相關命令1.hdfs dfs     可以顯示hdfs常用命令[email protected]:/usr/local/hadoop/etc/hadoop$ hdfs dfs Usage: h

shell命令之檔案操作

【檔案操作】 1、反選刪除檔案 先執行:shopt -s extglob 再執行:rm -rf !(file1) rm -rf !(file1|file2) 2、清空檔案內容 :> file 【磁碟管理】 1、檢視當前目錄下個檔

HDFS的shell(命令客戶端操作)

HDFS提供shell命令列客戶端,使用方法如下:常用命令引數介紹-help             功能:輸出這個命令引數手冊 -ls                  功能:顯示目錄資訊 示例: hadoop fs -ls hdfs://hadoop-serve

大資料(三十):zookeeper叢集kafka叢集部署

一、安裝Zookeeper 1.叢集規劃 在hadoop102、hadoop103和hadoop104三個節點上部署Zookeeper。 2.解壓安裝        1.解壓zookeeper安裝包到/usr/local/目錄下 tar -zxvf zookeepe

[非命令操作]GitHub中的mergeconflict

此處有3張圖,分別為2個branch:master和follower; 這是master!劃重點了啊!這個是master裡面的檔案,和follower沒有關係的 這個!這個是follower!看清了!follower裡面的檔案,master還不認識它呢! 第三張圖就是m