1. 程式人生 > >Linux下單機安裝部署kafka及代碼實現

Linux下單機安裝部署kafka及代碼實現

{} edt serial integer exc height 復制 有一個 images

技術交流群:233513714

這幾天研究了kafka的安裝及使用,在網上找了很多教程但是均以失敗告終,直到最後想起網絡方面的問題最終才安裝部署成功,下面就介紹一下kafka的安裝部署及代碼實現

一、關閉防火墻

重要的事情說100遍,關閉防火墻...(如果不關閉防火墻就會出現Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.等各種奇葩的問題)

1、關閉firewall:
systemctl stop firewalld.service #停止firewall

systemctl disable firewalld.service #禁止firewall開機啟動
firewall -cmd --state #查看默認防火墻狀態(關閉後顯示notrunning,開啟後顯示running)

2、關閉iptables

service iptables stop #停止iptables
chkconfig iptables off #永久關閉防火墻

service iptables status #查看防火墻關閉狀態

以上提供了關閉兩種防火墻的命令,可以選擇性操作

二、kafka安裝測試

1、安裝JRE/JDK,(kafka的運行要依賴於jdk,這裏就省略了jdk的安裝,需要註意的是jdk的版本一定要支持所下載的kafka版本,否則就會報錯,這裏我安裝的是jdk1.7)

2、下載地址:http://kafka.apache.org/downloads.html(我下載的版本是kafka_2.11-0.11.0.1)

3、解壓:

tar -xzvf kafka_2.11-0.11.0.1.tgz

rm kafka_2.11-0.11.0.1.tgz (這裏一定要刪除壓縮包,不然會出現zk或kafka啟動不起來的問題)

cd kafka_2.11-0.11.0.1

4、在kafka_2.11-0.11.0.1目錄下

/bin 啟動和停止命令等。
/config 配置文件
/libs 類庫

5、修改配置

在config下修改zookeeper.properties為如下配置

maxClientCnxns=100
tickTime=2000
initLimit=10
syncLimit=5

在server.properties添加如下配置

port=9092
host.name=10.61.8.6

zookeeper.connect=localhost:2181

zookeeper.connection.timeout.ms=6000

(以上配置沒有的就需要添加)

6、啟動、測試、停止

(1)、啟動zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties & (&是為了能退出命令行)

(2)、啟動kafka

bin/kafka-server-start.sh config/server.properties &

(3)、查看kafka和zk是否啟動

ps -ef|grep kafka

(4)、創建topic(topic的名字叫abc)

bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 8 --replication-factor 2 --topic abc

(5)、刪除topic

bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic abc --zookeeper localhost:2181

(6)、查看topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

(7)、producter推送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic abc

(8)、consumer消費消息

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic abc --from-beginning

(9)、停止kafka

bin/kafka-server-stop.sh

(10)、停止zookeeper

bin/zookeeper-server-stop.sh

(11)、殺死服務

kill -9 123 (123是進程號)

三、java代碼實現

producter

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;


/**
 * Created by Administrator on 2017/10/23 0023.
 */
public class KafkaProducter {
    private static final Logger log = LoggerFactory.getLogger(KafkaProducter.class);
    private final Producer<String, String> producer;
    public final static String TOPIC = "abc";

    public static void main(String[] args) {
        new KafkaProducter().produce();
    }

    private KafkaProducter() {
        Properties props = new Properties();
        //此處配置的是kafka的端口
        props.put("metadata.broker.list", "10.61.8.6:9092");
        //配置value的序列化類
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        //配置key的序列化類
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        //0、這意味著生產者從不等待來自代理的確認(與0.7相同的行為)。這個選項提供了最低的延遲,但是最弱的持久性保證(當服務器失敗時,一些數據將丟失)。
        //1、這意味著在主副本接收到數據後,生產者得到確認。這個選項提供了更好的持久性,因為客戶機一直等待直到服務器確認請求成功(只有消息被寫入到已死的領導人,但尚未被復制的消息將會丟失)。
        //-1、這意味著在所有同步副本都接收到數據之後,生產者得到確認。這個選項提供了最好的持久性,我們保證只要至少有一個同步副本,就不會丟失任何消息。
        props.put("request.required.acks", "-1");
        producer = new Producer<String, String>(new ProducerConfig(props));
    }

    void produce() {
        int messageNo = 1;
        final int COUNT = 10;
        while (messageNo < COUNT) {
            String key = String.valueOf(messageNo);
            String data = "hello kafka" + key;
            producer.send(new KeyedMessage<String, String>(TOPIC, key, data));
            log.info("",data);
            messageNo++;
        }
    }
}

consumer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * Created by Administrator on 2017/10/25 0025.
 */
public class KafkaConsumer {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
    private final ConsumerConnector consumer;
    public final static String TOPIC = "abc";


    public static void main(String[] args) {
        new KafkaConsumer().consume();
    }

    private KafkaConsumer() {
        Properties props = new Properties();
        //zookeeper 配置
        props.put("zookeeper.connect", "10.61.8.6:2181");
        //group 代表一個消費組
        props.put("group.id", "jd-group");
        //zk連接超時
        props.put("zookeeper.session.timeout.ms", "4000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        //序列化類
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        ConsumerConfig config = new ConsumerConfig(props);
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
    }

    void consume() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(TOPIC, new Integer(1));
        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
        Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
        KafkaStream<String, String> stream = consumerMap.get(TOPIC).get(0);
        ConsumerIterator<String, String> it = stream.iterator();
        while (it.hasNext()) {
            log.info("kafka監聽到的消息:{}", it.next().message());
        }
        log.info("kafka監聽完畢");
    }

}

原創不易,您的支持是我前進的動力

技術分享技術分享

Linux下單機安裝部署kafka及代碼實現