1. 程式人生 > >【Kafka】Kafka 1.0.1案例詳解之快速入門

【Kafka】Kafka 1.0.1案例詳解之快速入門

這個章節我們將從Kafka叢集的安裝部署講起,並測試topic的建立,訊息的釋出訂閱等功能。希望對你有所幫助。廢話不多說,我們開始
  • 單機模式的安裝
  1. 下載Kafka元件
[[email protected] kafka]$ wget http://mirrors.hust.edu.cn/apache/kafka/1.0.1/kafka_2.11-1.0.1.tgz–2018-05-08 11:39:56– http://mirrors.hust.edu.cn/apache/kafka/1.0.1/kafka_2.11-1.0.1.tgzResolving mirrors.hust.edu.cn… 202.114.18.160Connecting to mirrors.hust.edu.cn|202.114.18.160|:80… connected.
HTTP request sent, awaiting response… 200 OKLength: 49766096 (47M) [application/octet-stream]Saving to: “kafka_2.11-1.0.1.tgz”
  1. 下載安裝包並解壓,配置zookeeper節點
因為Kafka用到了Zookeeper,這裡需要先安裝zookeeper叢集:[[email protected] zookeeper]$ wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz–2018-05-08 11:47:33– http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
Resolving mirrors.hust.edu.cn… 202.114.18.160Connecting to mirrors.hust.edu.cn|202.114.18.160|:80… connected.HTTP request sent, awaiting response… 200 OKLength: 35042811 (33M) [application/octet-stream]Saving to: “zookeeper-3.4.10.tar.gz”解壓並修改zookeeper配置檔案:[[email protected] zookeeper-3.4.10]$ tar -xzf zookeeper-3.4.10.tar.gz
# The number of milliseconds of each ticktickTime=2000# The number of ticks that the initial# synchronization phase can takeinitLimit=10# The number of ticks that can pass between# sending a request and getting an acknowledgementsyncLimit=5# the directory where the snapshot is stored.# do not use /tmp for storage, /tmp here is just# example sakes.dataDir=/home/hadoop/zookeeper/zookeeper-3.4.10/tmp# the port at which the clients will connectclientPort=2181# the maximum number of client connections.# increase this if you need to handle more clients#maxClientCnxns=60## Be sure to read the maintenance section of the# administrator guide before turning on autopurge.## http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance## The number of snapshots to retain in dataDir#autopurge.snapRetainCount=3# Purge task interval in hours# Set to “0” to disable auto purge feature#autopurge.purgeInterval=1
  1. 啟動Kafka
[[email protected] kafka_2.11-1.0.1]$ bin/kafka-server-start.sh config/server.properties
  1. 建立topic
現在我們建立一個名字為test的topic[[email protected] kafka_2.11-1.0.1]$ bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic testCreated topic “test”.然後檢檢視zookeeper中是否已經有了該topic[[email protected] kafka_2.11-1.0.1]$ bin/kafka-topics.sh –list –zookeeper localhost:2181test
  1. 傳送測試訊息
[[email protected] kafka_2.11-1.0.1]$ bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test>this is a message>this is another message
  1. 接收訊息
[[email protected] kafka_2.11-1.0.1]$ bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginningthis is a messagethis is another message至此單機模式的kafka已經可以正常收發訊息了,但是在生產環境中我們肯定是需要搭建分散式叢集的,下面我們來看下叢集模式的安裝。
  • 叢集模式的安裝
在Kafka 1.x中我們發現已經將zookeeper整合進去了,如果直接是用kafka自帶的zookeeper,那麼部署起來更方便了。
  1. 修改zookeeper配置檔案
[[email protected] kafka_2.11-1.0.1]$ vim config/zookeeper.propertiesdataDir=/home/hadoop/kafka/kafka_2.11-1.0.1/zDatadataLogDir=/home/hadoop/kafka/kafka_2.11-1.0.1/zLog# the port at which the clients will connectclientPort=2181# disable the per-ip limit on the number of connections since this is a non-production configmaxClientCnxns=0initLimit=5syncLimit=2server.1=192.168.0.181:2888:3888server.2=192.168.0.182:2888:3888server.3=192.168.0.183:2888:3888新增myid(另外兩臺伺服器的id必須不一樣,myid是zookeeper互相識別用的id):echo “1” > /home/hadoop/kafka/kafka_2.11-1.0.1/zData/myid
  1. 修改kafka配置檔案
vim config/server.propertiesbroker.id=0(另外兩臺伺服器的id必須不一樣)listeners=PLAINTEXT://:9092log.dirs=/home/hadoop/kafka/kafka_2.11-1.0.1/log
  1. 複製kafka整個安裝目錄到叢集的各個節點
scp -r kafka_2.11-1.0.1 slaver01:/home/hadoop/kafka/scp -r kafka_2.11-1.0.1 slaver02:/home/hadoop/kafka/
  1. 分別在其他節點上修改zookeeper的myid和kafka的broker.id
echo “2” > /home/hadoop/kafka/kafka_2.11-1.0.1/zData/myidecho “3” > /home/hadoop/kafka/kafka_2.11-1.0.1/zData/myidvim config/server.propertiesbroker.id=1broker.id=2
  1. 啟動叢集
啟動各個節點的zookeeper:nohup bin/zookeeper-server-start.sh config/zookeeper.properties >> zoo.out 2>&1 &啟動各個節點的kafka:nohup bin/kafka-server-start.sh config/server.properties >> kafka.out 2>&1 &
  1. 建立一個多副本的topic
[[email protected] kafka_2.11-1.0.1]$ bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 3 –partitions 1 –topic my-replicated-topicCreated topic “my-replicated-topic”.然後我們通過以下命令可以發現該topic下的各個副本均勻分佈到了各個節點broker上[[email protected] kafka_2.11-1.0.1]$ bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic my-replicated-topicTopic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
  1. 傳送接收訊息測試
[[email protected] kafka_2.11-1.0.1]$ bin/kafka-console-producer.sh –broker-list localhost:9092 –topic my-replicated-topic>message 1>message 2[[email protected] kafka_2.11-1.0.1]$ bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –from-beginning –topic my-replicated-topicmessage 1message 2
  1. 高可用測試
我們kill掉leader所在的broker節點kill -9 19681[[email protected] kafka_2.11-1.0.1]$ bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic my-replicated-topicTopic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0可以發現leader已經變成2了
  1. 使用Kafka Conect匯入匯出資料
echo -e “foo\nbar” > test.txt啟動連線:bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties檢查接收到的訊息:[[email protected] kafka_2.11-1.0.1]$ tail -f test.sink.txtfoo

相關推薦

KafkaKafka 1.0.1案例快速入門

這個章節我們將從Kafka叢集的安裝部署講起,並測試topic的建立,訊息的釋出訂閱等功能。希望對你有所幫助。廢話不多說,我們開始單機模式的安裝下載Kafka元件[[email protect

SpringSpring MVC原理及配置

進行 return sub sca scrip uil 線程安全 松耦合 必須 1.Spring MVC概述: Spring MVC是Spring提供的一個強大而靈活的web框架。借助於註解,Spring MVC提供了幾乎是POJO的開發模式,使得控制器的開發和測試更加簡

LinuxLinux定時任務Crontab命令

星期幾 ima 默認 定時 最好 表示 時間 path 配置文件 linux 系統則是由 cron (crond) 這個系統服務來控制的。Linux 系統上面原本就有非常多的計劃性工作,因此這個系統服務是默認啟動的。另 外, 由於使用者自己也可以設置計劃任務,所以, Lin

轉載linux下的mount命令

文件的 flag 自動加載 網絡文件系統 解決問題 選項 lock home 多個參數 以下內容來自:http://blog.csdn.net/clozxy/article/details/5299054 http://linux.chinaunix.net/techdo

C語言文件操作

pri void rfi 識別 archive format 隨機 stat 文本文 轉自:http://www.cnblogs.com/likebeta/archive/2012/06/16/2551780.html C語言中沒有輸入輸出語句,所有的輸入輸出功能都用

AndroidAndroid六種布局

spec rec 默認 bottom ron ado 居中 右下角 控制 這篇就對LinearLayout、RelativeLayout、自定義ViewGroup、FrameLayout、TableLayout、AbsoluteLayout六種布局進行詳細的講解。 1

C++拷貝構造函數

簡單的 之間 其他 創建 變量 tac 動態分配空間 data 產生 一. 什麽是拷貝構造函數 首先對於普通類型的對象來說,它們之間的復制是很簡單的,例如: int a = 100; int b = a; 而類對象與普通對象不同,類對象內部結構一般較為復雜,

轉載 c++中static的用法

ostream 並不會 style 轉載 程序員 都是 note 每次 reference 出處: http://blog.csdn.net/majianfei1023/article/details/45290467 C 語言的 static 關鍵字有三種(具體來說是

shellLinux shell for 循環

linux shell編程 for循環 運維 for 循環格式 for i in 條件 do 內容 done 實例循環1到10並打印 #!/bin/bash - for i in `seq 10` do echo $i done 版權所有:arppinging

轉載Maven依賴中的scope

lan 無需 而已 ref targe 周期 包含 配置 com Maven的一個哲學是慣例優於配置(Convention Over Configuration), Maven默認的依賴配置項中,scope的默認值是compile,項目中經常傻傻的分不清,直接默認了。今天梳

java的動態代理機制

bar 同時 @override returns 復制 exce ins com hello 在學習Spring的時候,我們知道Spring主要有兩大思想,一個是IoC,另一個就是AOP,對於IoC,依賴註入就不用多說了,而對於Spring的核心AOP來說,我們不但要知道怎

Pythonhasattr() getattr() setattr() 使用方法

att err value ror 綜合 設置 pytho clas rec 本文轉自 https://www.cnblogs.com/cenyu/p/5713686.html hasattr(object, name)判斷一個對象裏面是否有name屬性或者name方法,返

Oracle 11g Dataguard 參數

異步模式 正常 10g enable ffi sys 過程 tnsnames async 轉自:https://www.jb51.net/article/52269.htm 這篇文章主要介紹了Oracle 11g Dataguard參數詳解,包含了獨立參數、主庫參數、備

TestNGTestNG併發執行用例和範例

前言 TestNG有多種併發方式支援,方法的併發,class級的併發,test級的併發等; 根據實際應用可以靈活的配置和使用,下面分別對幾種併發方法進行說明: 一、方法級併發 方法級併發即method級併發,此種併發方式需要將xml中的suite標籤的parallel屬性設定為m

TestNGTestNG用例執行方法

一、直接在eclipse內部執行 這種方式比較簡單,就是直接右鍵一個test檔案然後選擇以testNG執行,或者選擇xml檔案執行,基本執行方法見帖子:https://mp.csdn.net/mdeditor/83243822# 二、命令列方式 除了直接再eclipse內部執行外

NLPYou May Not Need Attention

廢話: 之前蹭上了BERT的熱度,粉以個位數每天的速度增長,感謝同學們的厚愛!弄得我上週本來打算寫文字分類,寫了兩筆又放下了,畢竟文字分類有很多SOTA模型,而我的研究還不夠深入。。慢慢完善吧,今天看到一篇You may not need attention,寫attention起家的我怎麼能放過,立刻打印出

轉載BlockingQueue(阻塞佇列)

注意:該隨筆內容完全引自http://wsmajunfeng.iteye.com/blog/1629354,寫的很好,非常感謝,複製過來算是個積累,怕以後找不到。 一. 前言   在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資

5Django專案配置settings.py

夫唯不爭,故天下莫能與之爭 ——老子《道德經》 本節內容 1.專案配置檔案settings.py介紹 2.資料庫配置【MySQL】 3.建立模型物件並和資料庫同步 4.python官方提供的專案後臺管理平臺的使用 注意:本節內容我們會按照三部分進行分步講解 我們建立好了一個Pyth

轉載Linux下安裝軟體命令

【轉載日期】2018.09.11 【轉載標題】Linux下安裝軟體命令詳解 ----------------------------------------------------------------------------------------------

vuevee-validate 表單驗證

Pre:安裝   npm install [email protected]   內建的校驗規則 after{target} - 比target要大的一個合法日期,格式(DD/MM/YYYY) alpha - 只包