1. 程式人生 > >Kafka叢集搭建及簡單使用(控制檯與Java客戶端)

Kafka叢集搭建及簡單使用(控制檯與Java客戶端)

1.下載

去官網下載即可

2.解壓

3.配置環境變數

/etc/profile

4.安裝zookeeper

ZooKeeper完全分散式叢集搭建

5.我先按照官方文件在單節點上用一下,先不修改配置檔案

5.1啟動zookeeper

[email protected]:/opt/kafka_2.10-0.8.1.1$ bin/zookeeper-server-start.sh config/zookeeper.properties 
5.2啟動kafka服務
[email protected]:/opt/kafka_2.10-0.8.1.1$ bin/kafka-server-start.sh config/server.properties
5.3建立topic
[email protected]:/opt/kafka_2.10-0.8.1.1$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
5.4檢視topic
[email protected]:/opt/kafka_2.10-0.8.1.1$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test
5.5使用生產者往topic生產訊息
[email protected]
:/opt/kafka_2.10-0.8.1.1$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. hello world i love u r u love me
5.6使用消費者消費topic裡的訊息
[email protected]:/opt/kafka_2.10-0.8.1.1$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
hello
world
i
love
u
r
u
love
me
--from-beginning表示從頭開始接收

6.叢集搭建及使用

6.1修改配置檔案

[email protected]:/opt/kafka_2.10-0.8.1.1/config$ vi server.properties
測試用修改兩處即可

server.properties

# The id of the broker. This must be set to a unique integer for each broker.broker伺服器的ID要唯一,在drguo3設為0,drguo4設為1,drguo5設為2,不重複就行
broker.id=0
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.下面是zookeeper連線方式
zookeeper.connect=drguo3:2181,drguo4:2181,drguo5:2181
修改後複製kafka目錄到drguo4/5
[email protected]:/opt$ scp -r kafka_2.10-0.8.1.1/ drguo4:/opt/
改一下配置檔案裡的brokerID(上面註釋中已經說了)
[email protected]:/opt$ ssh drguo4
Welcome to Ubuntu 15.10 (GNU/Linux 4.2.0-16-generic x86_64)

 * Documentation:  https://help.ubuntu.com/

Last login: Fri Apr  1 22:29:01 2016 from 192.168.80.151
[email protected]:~$ cd /opt/kafka_2.10-0.8.1.1/config/
[email protected]:/opt/kafka_2.10-0.8.1.1/config$ vi server.properties
6.2啟動zookeeper
[email protected]:/opt/kafka_2.10-0.8.1.1$ zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.8/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[email protected]:/opt/kafka_2.10-0.8.1.1$ ssh drguo4
Welcome to Ubuntu 15.10 (GNU/Linux 4.2.0-16-generic x86_64)

 * Documentation:  https://help.ubuntu.com/

Last login: Sun Apr  3 16:33:26 2016 from 192.168.80.151
[email protected]:~$ zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.8/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[email protected]:~$ exit
登出
Connection to drguo4 closed.
[email protected]:/opt/kafka_2.10-0.8.1.1$ ssh drguo5
Welcome to Ubuntu 15.10 (GNU/Linux 4.2.0-16-generic x86_64)

 * Documentation:  https://help.ubuntu.com/

Last login: Fri Apr  1 22:27:21 2016 from 192.168.80.151
[email protected]:~$ zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.8/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[email protected]:~$ exit
登出
Connection to drguo5 closed.
6.3啟動kafka服務,三臺都要啟動(如果想讓它在後臺執行,不佔用視窗在最後加個 &)
[email protected]/4/5:/opt/kafka_2.10-0.8.1.1$ bin/kafka-server-start.sh config/server.properties
6.4建立topic
[email protected]:/opt/kafka_2.10-0.8.1.1$ bin/kafka-topics.sh --create --zookeeper drguo3:2181 --replication-factor 3 -partitions 1 --topic phone
Created topic "phone".
[email protected]:/opt/kafka_2.10-0.8.1.1$ bin/kafka-topics.sh --list --zookeeper drguo3:2181
phone
[email protected]:/opt/kafka_2.10-0.8.1.1$ bin/kafka-topics.sh --create --zookeeper drguo3:2181 --replication-factor 3 -partitions 1 --topic pc
Created topic "pc".
[email protected]:/opt/kafka_2.10-0.8.1.1$ bin/kafka-topics.sh --list --zookeeper drguo3:2181
pc
phone
6.5使用生產者往topic生產訊息
[email protected]:/opt/kafka_2.10-0.8.1.1$ bin/kafka-console-producer.sh --broker-list drguo3:9092 --topic phone
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
hello
world
iphone
oneplus
6.6使用消費者消費topic裡的訊息
[email protected]:/opt/kafka_2.10-0.8.1.1$ bin/kafka-console-consumer.sh --zookeeper drguo3:2181 --from-beginning --topic phone
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
hello
world
iphone
oneplus

[email protected]:/opt/kafka_2.10-0.8.1.1$ bin/kafka-topics.sh --describe --zookeeper drguo3:2181 --topic phone
Topic:phone	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: phone	Partition: 0#分割槽0	Leader: 0#leader是id為0的伺服器broker	Replicas: 0,1,2#副本在0/1/2伺服器上	Isr: 0,1,2#0/1/2處於同步

7.Java客戶端

7.1生產者

package club.drguo.kafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class ProducerDemo {
	public static void main(String[] args) throws Exception {
		Properties props = new Properties();
		props.put("zk.connect", "drguo3:2181,drguo4:2181,drguo5:2181");
		props.put("metadata.broker.list","drguo3:9092,drguo4:9092,drguo5:9092");
		//指定序列化類
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		ProducerConfig config = new ProducerConfig(props);
		Producer<String, String> producer = new Producer<String, String>(config);

		// 傳送業務訊息
		// 讀取檔案 讀取記憶體資料庫 讀socket埠
		for (int i = 1; i <= 100; i++) {
			Thread.sleep(500);
			producer.send(new KeyedMessage<String, String>("phone",
					"iphone" + i + "plus"));
		}

	}
}
7.2消費者
package club.drguo.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class ConsumerDemo {
	private static final String topic = "phone";
	private static final Integer threads = 1;

	public static void main(String[] args) {
		
		Properties props = new Properties();
		props.put("zookeeper.connect", "drguo3:2181,drguo4:2181,drguo5:2181");
		//消費者組
		props.put("group.id", "1");
		//偏移量自動重新設定
		props.put("auto.offset.reset", "smallest");

		ConsumerConfig config = new ConsumerConfig(props);
		ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config);
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(topic, threads);
		
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
		
		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
		
		for(final KafkaStream<byte[], byte[]> kafkaStream : streams){
			new Thread(new Runnable() {
				@Override
				public void run() {
					for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){
						String msg = new String(mm.message());
						System.out.println(msg);
					}
				}
			
			}).start();
		
		}
	}
}



相關推薦

Kafka叢集搭建簡單使用控制檯Java客戶

1.下載 去官網下載即可 2.解壓 3.配置環境變數 /etc/profile 4.安裝zookeeper ZooKeeper完全分散式叢集搭建5.我先按照官方文件在單節點上用一下,先不修改配置檔案 5.1啟動zookeeper [email protec

java多執行緒通訊伺服器客戶

基於TCP的多執行緒通訊 伺服器執行緒: package com.netproject1; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOExc

Zookeeper偽分佈叢集搭建簡單使用演示

要搭建Zookeeper完全分散式叢集請戳這 第一步,下載Zookeeper穩定版 我先科普一下 很多軟體在正式釋出前都會發布一些預覽版或者測試版,一般都叫“beta版”或者 “rc版”,特別是開源軟體,甚至有“alpha版”,下面來解釋一下各個版本的意思。 alp

kafka叢集搭建原理

Apache Kafka 企業級訊息佇列 爬蟲課程:原生佇列、多執行緒重複消費的問題、ArrayBlockingQueue阻塞佇列 分散式爬蟲:使用Redis的list資料結構做佇列。 分散式電商:AMQ 訊息佇列、釋出一個商品時傳送一個訊息,有程式消費訊息建立靜態化頁

Flume NG高可用叢集搭建詳解基於flume-1.7.0

1、Flume NG簡述 Flume NG是一個分散式,高可用,可靠的系統,它能將不同的海量資料收集,移動並存儲到一個數據儲存系統中。輕量,配置簡單,適用於各種日誌收集,並支援 Failover和負載均衡。並且它擁有非常豐富的元件。Flume NG採用的是三層架構:Agent層,Collecto

Redis的安裝啟動doc和本地客戶

目錄 html redis的安裝 www doc target redis-cli span ron 安裝都是老生長談了,這裏推薦倆個文章看看把。:打開一個cmd窗口 使用cd命令切換目錄到 C:\redis 運行 redis-server.exe redis.conf(安

libevent簡單介紹以及使用帶有伺服器和客戶

這兩天使用了下libevent,只使用了網路IO事件(EV_READ和 EV_WRITE),查閱了下libevent的介面文件,這裡做點筆記,並附上程式碼,開發環境是win7+vs2010 這裡只介紹需要用到的libevent的介面函式,更多介面函式請檢視libevent官方文件 如果想了解

libevent2入門例-簡單的伺服器也有簡短客戶

  libevent 庫實在是輕量級,下面用不到百行程式碼,實現一個簡單的伺服器,其功能是在9995埠監聽連線,有連線上來就給回覆:“Hello, World!”。另外還添加了一個訊號處理回撥,當你按 Ctrl+C 時,伺服器會清理資源,乾乾淨淨地退出,這是一個伺服器開發者

Oracle資料庫搭建PL/SQL連線(免安裝客戶)

Oracle配置 Oracle資料庫安裝略。 如上圖,新建完資料庫後需要啟動的服務:1、監聽服務;2、例項服務,本文示例資料庫:greendb 一、伺服器端配置: 1、為例項配置監聽,本文路徑:E:\app\Administrator\product\11.2.0\dbh

Android 客戶伺服器進行資料互動二、登入客戶

概要 Android客戶端分為User,HttpUtil,HttpCallbackListener,MainActivity四個部分。User model與服務端的一樣,一方面是用於本地使用者資訊的儲存model,另一方面也是為了保證構造URL時使用的key一

配置zkclient開源的ZooKeeper客戶

group bsp pro artifact osc keep zookeeper cti zookeep <properties> <zkclient.version>0.1</zkclient.version> </p

Ribbon負載均衡的客戶

前面講了服務的註冊與發現,微服務專案會把專案的各個業務需求劃分成幾個模組來單獨提供服務,各服務間的呼叫都是採用Http Restful來實現,但是在SpringClound中服務間的呼叫有兩種方式:一種是ribbon+ restTemplate;另一種是feign;  Ribbon:在Spring

上傳EXCEL檔案到後,匯入並解析EXCEL的前後實現Vue.js + java

vue.js前端,Java後端,如何匯入excel檔案,並且解析,本文給了前後端程式碼的實現,以及完美實踐OK之後的分享。 前端主要用了element-ui的upload元件。 關於這個元件的官方文件很少:http://element-cn.eleme.io/#/zh-CN/compon

docker安裝fastdfsjava客戶測試

一、docker 安裝FastDFS 1、拉取映象 docker pull morunchang/fastdfs 2、建立並啟動tracker容器 docker run -d --name=tracker -v /home/fastdfs_docker/fdfs/tracker:/da

在Windows 中配置Oracle ODBC驅動不需要安裝客戶

用於測試的作業系統:Win7 64X Oracle ODBC驅動版本:11.02.00.04 安裝配置步驟: (1)解壓instantclient_12.rar檔案到任意位置(我的示例:G:\oracle odbc\instantclient_12\instantc

centos7 sftp多使用者配置java客戶連線

知識背景SFTP 為 SSH的一部分。在SSH軟體包中,已經包含了一個叫作SFTP(Secure File Transfer Protocol)的安全檔案資訊傳輸子系統,SFTP本身沒有單獨的守護程序,它必須使用sshd守護程序(埠號預設是22)來完成相應的連線和答覆操作,所

微信開發本地調試工具模擬微信客戶

con strong 分享圖片 blank width -s 公眾賬號 希望 aid 微信開發本地調試工具(模擬微信客戶端) 開源免費微信管家系統(java)源碼下載 微信部署需要依靠80端口,如何快速有效的調試

C服務java客戶的socket通訊注意事項

今天這個專案需要c服務端與java客戶端進行socket通訊。 中間遇到了很多問題。 首先搜尋了一下 http://blog.sina.com.cn/s/blog_55934df80100i55l.html 有以下幾點要注意的地方: 我的客戶端是用c寫的,屬於

springCloud學習3Netflix Hystrix彈性客戶

springcloud 總集:https://www.tapme.top/blog/detail/2019-02-28-11-33 本次用到全部程式碼見文章最下方。 一、為什麼要有客戶端彈性模式   所有的系統都會遇到故障,分散式系統單點故障概率更高。如何構建應用程式來應對故障,是每個軟體開發人員工作的關鍵部

手把手一起入門 RabbitMQ 的六大使用模式Java 客戶

# 原文地址:[手把手一起入門 RabbitMQ 的六大使用模式(Java 客戶端)](https://blog.csdn.net/Howinfun/article/details/107181334) # 為什麼使用 MQ? 在這裡我就不多說了,無非就是削峰、解耦和非同步。這裡沒有很多關於 MQ 的理論和