1. 程式人生 > >Kafka的簡單介紹與使用,生產者和消費者的JavaApi

Kafka的簡單介紹與使用,生產者和消費者的JavaApi

一、簡介
2、實時流資料管道,可以在
3、構建流式引用
4、是一個分散式流式處理平臺, 統稱訊息佇列或訊息中介軟體,有生產者和消費者之分
消費者去kafka中拉資料(而不是kafka給資料)
其實kafka就是一個臨時儲存的外掛,但是這個外掛效能很強大
kafka 是用scala編譯的
0.8版本的偏移量 可以自己管理,0.8版本用的很多
高可用、高吞吐、多副本、容錯、讀寫能力強

三、kafka的元件
1、生產者:Producer
是訊息生產的源頭,生產者不需要連線zookeeper
2、消費者:Consumer
是消費的使用方,負責消費kafka伺服器上的訊息
3、主題:Topic: 由使用者定義配置,用於建立生產者和消費者之間的訂閱關係,生產者傳送訊息到指定的Topic下,消費者從這個Topic消費訊息
4、訊息分割槽:Partition,一個Topic下面會分為多個分割槽,這個分割槽也是使用者指定的,在kafka上沒有主從結構,但是Partition上是有主從結構的
,每個分割槽都有一個活躍的(leader),leader分割槽負責讀寫,follower分割槽負責同步資料,producer會向leader的分割槽寫資料,讀資料時可以從leader讀,也可以從follower中讀,實現讀寫的壓力均攤到多型機器上,提高了讀寫能力,儘量讓分割槽的leader分散到不同的機器上,
生產環境中,副本儲存3分(最佳),配置分割槽的數量broker的數量*每一臺機器上所用的核數(充分發揮機器的多核多執行緒的功能)
follower分割槽的功能是:負責同步資料,也可以設定從分割槽讀取資料
kafka的吞吐量大, 同步資料
5、Broker:kafka 的伺服器ID,一臺機器就叫一個Broker
6、消費者組:Group,用於歸類同一消費者,在kafka中,多個消費者可以共同消費一個Topic下的訊息,每個消費者消費一部分訊息,這些消費者就組成了一個分割槽,擁有同一個分組名稱
消費者組,可以消費一個或多個分割槽的資料,相反,一個數據同一時刻,只能被一個消費者消費
7、Offset:偏移量,消費者在拉取資料過程中,需要知道訊息在檔案中的偏移量,然後才能通過偏移量拉去資料

四、使用場景
1、日誌收集
2、訊息系統(kafka其一個緩衝的作用)
3、運營指標
4、流式處理(spark streaming)
index檔案存的是索引(元資料,偏移量),先找索引檔案,在拉去真正的資料
logs,存放真正的資料

五、kafka的常用命令
1、新建一個主題
預設有一個partition(配置檔案中)分割槽數只能增加不能減少
建立一個topic,設定分割槽數,設定副本數和zookeeper,因為partition有主從結構,所以zookeeper中存放一些元資料,在讀取時也需要指定zk,找到topic對應的元資料
建立topic(topic的資訊儲存在zk中,zk中儲存了topic叫什麼,有幾個副本,幾個分割槽)在建立時,如果指定副本數大於叢集中的數量brokers會報錯kafka.admin.AdminOperationException: replication factor: 4 larger than available brokers: 3
分割槽數可以隨意指定

bin/kafka-topics.sh --create --zookeeper 192.168.163.128:2181,192.168.136.129:2181,192.168.136.130:2181 --partition 3 --replication-factor 3 --topic test-01

檢視topic
kafka-topics.sh --list --zookeeper 192.168.163.128:2181,192.168.136.129:2181,192.168.136.130:2181

檢視描述詳細資訊
kafka-topics.sh --describe --zookeeper 192.168.163.128:2181,192.168.136.129:2181,192.168.136.130:2181 --topic test-01

分割槽數 每個分割槽的leader 副本 存活副本數
數字代表broker.id
Topic:test-01 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test-01 Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: test-01 Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: test-01 Partition: 2 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1

刪除topic:真正刪除沒有刪除,要真正刪除時,要刪除zookeeper的client端的元資料
kafka-topics.sh --delete --zookeeper 192.168.163.128:2181,192.168.136.129:2181,192.168.136.130:2181 --topic test-01
顯示 ,再 list查詢時 顯示 test-01 - marked for deletion
要真正刪除時,要刪除zookeeper的client端的元資料:
rmr /brokers/topics/test-01 ,刪除client中的brokers中topic對應的
rmr /config/topics/test-01 ,刪配置
rmr /admin/delete_topics,刪除admin中的標記為刪除的topics
最後刪除真正的資料 rm -rf test-01-2 rm -rf test-01-*

修改分割槽數,不能修改分割槽數
bin/kafka-topics.sh --alter --zookeeper 192.168.163.128:2181,192.168.136.129:2181,192.168.136.130:2181 --partition 5 --topic test-03

六、API操作
啟動生產者
bin/kafka-console-producer.sh --broker-list 192.168.136.128:9092,192.168.136.129:9092,192.168.136.130:9092 --topic test-03
啟動消費者
kafka-console-consumer.sh --zookeeper 192.168.136.128:2181,192.168.136.129:2181,192.168.136.130:2181 --topic test-03 --from-beginning(代表從頭消費,如果先啟動生產者,在啟動消費者之前就有生產,如果在啟動消費者時使用 --form–beginning會 從頭開始消費,如果不使用,則是在消費 消費者 啟動之後 生產者生產的資料,如果有多個消費者且不再同一個組裡面,那麼會有多個消費者都實時監控生產者生產的資料)

Java API

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

public class Producer1day03 {
    public static void main(String[] args) {
        final Properties prop = new Properties();
                  //"metadata.broker.list"
        prop.put("metadata.broker.list","192.168.136.128:9092," +
                "192.168.136.129:9092," +
                "192.168.136.130:9092");
        prop.put("serializer.class","kafka.serializer.StringEncoder");
        ProducerConfig conf = new ProducerConfig(prop);
        Producer<String, String> producer = new Producer<>(conf);
        int i = 0;
        while (true){
            KeyedMessage<String, String> msg = new KeyedMessage<String, String>("test02","msg"+i);
            producer.send(new KeyedMessage<String,String>("test-03",
                    "msg"+i));
            i++;
        }

    }
}

消費者

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class Consumersday03 {
    private static final String topic ="test-03";
    private static final int threads = 2;
    public static void main(String[] args) {
        //配置屬性
        final Properties prop = new Properties();
        prop.put("zookeeper.connect","192.168.136.128:2181," +
                "192.168.136.129:2181,192.168.136.130:2181"
                );
        //配置消費者組
        prop.put("group.id","kkk");
        //配置從頭消費資料
        //prop.put("auto.offset.reset","smallset");
        //建立消費者
        final ConsumerConfig conf = new ConsumerConfig(prop);
        final ConsumerConnector connector = Consumer.createJavaConsumerConnector(conf);
        //建立Map,主要用來儲存多個topic資訊
        final HashMap<String, Integer> map = new HashMap<>();
        map.put(topic,threads);
        //建立獲取資訊流
        final Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams =
                connector.createMessageStreams(map);

        //讀取某一個
        final List<KafkaStream<byte[], byte[]>> kafkaStreams = messageStreams.get(topic);

        //迴圈接收Map內的topic資料
        for(KafkaStream<byte[], byte[]>  stream: kafkaStreams){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for(MessageAndMetadata<byte[], byte[]> m :stream){
                        String msg = new String(m.message());
                        System.out.println(msg);
                    }
                }
            }).start();
        }


    }
}