1. 程式人生 > >大資料叢集搭建和使用之八——kafka配置和使用

大資料叢集搭建和使用之八——kafka配置和使用

這個系列指南使用真實叢集搭建環境,不是偽叢集,用了三臺騰訊雲伺服器

或者訪問我的個人部落格站點,連結

Kafka

配置

kafka依賴zookeeper,所以先確保叢集已經安裝zookeeper並且能夠正常啟動。
浪費了一整天的時間debug結果bug很簡單(至少現在叢集沒有崩潰)

建立目錄樹 /opt/kafka/kafka2.12
在/root/kafka/kafka-logs/logs建立一個用於存放日誌的檔案
配置環境變數/etc/profile,新增bin目錄
修改配置檔案kafka/config/server.properties
1. 修改broker.id id和zookeeper的myid一致

(應該是這樣,這個bug我查了一天),每個主機的id都不一樣,每次修改前,需要確認(或者乾脆刪除)kafka日誌檔案(/root/kafka/kafka-logs/metaxxxx中的id是否和broker.id一致)
2. 修改zookeeper.connect和zookeeper.connection.timeout.ms

zookeeper.connect=master:2181,slave1:2181,slave2:2181
zookeeper.connection.timeout.ms=6000

3. 修改logdir(記得必須先建立檔案,kafka不會自己建立資料夾)
4. 修改兩處listener(vim使用/listener查詢),手動新增hostname(例如master,slave1,slave2),其實按照文件,只需要修改一處即可。

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set
to a unique integer for each broker. broker.id=0 ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://master:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). advertised.listeners=PLAINTEXT://master:9092 # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3 # The number of threads that the server uses for processing requests, which may include disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma separated list of directories under which to store log files log.dirs=/root/kafka/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3. offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining # segments drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=master:2181,slave1:2181,slave2:2181/kafka # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 ############################# Group Coordinator Settings ############################# # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. # The default value for this is 3 seconds. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=0 delete.topic.enable=true

啟動

cd /${KAFKA_HOME}

bin/kafka-server-start.sh config/server.properties -daemon > /root/kafka/kafka-logs/logs &

指定日誌的存放地點為/root/kafka/kafka-logs/logs
使用jps命令檢視kafka是否配置成功。

記一次伺服器被黑客攻擊

起因:kafka啟動總是異常(kafka程序啟動一兩分鐘後自動退出),日誌卻沒有記錄

  • [ ] 系統執行情況檢視工具top,關於top的介紹點這裡
  • [ ] 重新配置kafka,檢查各個配置項,仍然宕機
  • [ ] 重新配置與kafka相關聯的zookeeper,仍然宕機
  • [ ] 關閉不需要的程序,例如hbase,yarn,storm,hdfs,重啟kafka,仍然宕機
  • [ ] 重啟伺服器,重新開啟各個程序,順序為hdfs,yarn,zookeeper,hbase,storm,kafka,仍然宕機
  • [ ] 重啟伺服器,更換kafka版本,仍然宕機
  • [ ] 發現kafka總是宕機的伺服器cpu佔用幾乎100%,而master卻正常,100%的cpu佔用由java程序貢獻,具體執行任務未知。
  • [ ] google發現,hadoop叢集有cpu佔用過高的風險,參考這個連結,懷疑datanode導致cpu佔用過高,原因是linux核心記憶體申請優化對hadoop的副作用。
  • [ ] 關閉hadoop相關程序(只剩jps)cpu佔用仍然是100%,但是master節點cpu正常,100%的cpu佔用由java程序貢獻,具體執行任務未知。
  • [ ] 嘗試啟動kafka,master正常,slave失敗,slave節點cpu佔用仍然是100%
  • [ ] 重啟所有叢集伺服器,實時檢測cpu動態(top命令),先啟動zookeeper,正常,再啟動kafka,所有節點正常
  • [ ] 挨個啟動其他服務。hdfs無影響,yarn無影響,zookeeper無影響,hive無影響,hbase啟動後cpu飆升100%,導致kafka宕機,具體導致宕機的程序為regionserver。
  • [ ] 啟動除了hbase其他所有服務,一切正常。原因未知。
  • [ ] 第二天一早伺服器宕機
  • [ ] 百度重新配置了yarn框架引數
  • [ ] 重啟伺服器仍然宕機
  • [ ] 參考這裡這裡的連結有理由相信雲伺服器被黑了。
  • [ ] 暫時的解決方案
  • [ ] 諮詢騰訊雲客服,修改了安全組配置,關閉了8088埠。
  • [ ] 所有元件執行正常,之前懷疑是hbase的原因是,病毒檔案的執行需要一定的時間,而在這段時間裡,我剛好啟動了hbase,也有可能是病毒檔案需要依賴hbase作為資料庫?

使用KAFKA

基本概念

  • kafka是一個分散式的訊息快取系統
  • kafka叢集中的伺服器叫做broker
  • kafka有兩種客戶端,producer(訊息生產者),consumer(訊息消費者),客戶端(兩種)與kafka伺服器之間使用tcp通訊
  • kafka中不同業務系統的訊息可以通過topic進行區分,而且每一個訊息topic都會被分割槽,以分擔訊息讀寫的負載
  • 每一個分割槽可以有多個副本,防止資料的丟失
  • 如果某個分割槽中的資料需要更新,必須通過該分割槽所有副本中的leader來更新
  • 消費者可以分組,比如有兩個消費者AB,共同消費一個topic:testTopic,AB所消費的訊息不會重複,比如testTopic中有100個訊息,編號為0-99,如果A消費0-49,那麼B就消費50-99。消費者在消費時可以指定訊息的起始偏移量
  • kafka架構圖:

    producer是資料來源,比如flume架構,consumer是資料的輸出,例如storm架構。

  • kafka伺服器支援訊息的分主題、分割槽。不同的子系統可以使用不同的主題。分割槽的意義在於負載均衡。

kafka shell

  • 建立話題
bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 3 --partitions 1 --topic mytopics

建立的話題名稱是有要求的Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.

  • 如果在zookeeper裡指定了kafka的目錄,例如/kafka,那麼在用shell進行topic操作的時候,需要指定被操作的topic所屬的zookeeper目錄,例如bin/kafka-topics.sh –create –zookeeper master:2181 /kafka –replication-factor 3 –partitions 1 –topic mytopics。(因為kafka的叢集化是歸zookeeper管的

  • 列出當前話題

bin/ kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181
  • 刪除話題
kafka-topics.sh --delete --zookeeper master:2181 --topic mytopics

注意由控制檯的提示:Note: This will have no impact if delete.topic.enable is not set to true.可知,需要修改一下server.properties檔案,在最後一行加上delete.topic.enable=true

  • 建立一個生產者
kafka-console-producer.sh --broker-list master:9092 --topic t_test
  • 建立一個消費者
kafka-console-consumer.sh --zookeeper master:2181 --from-beginning --topic t_test
  • 檢視話題狀態資訊
kafka-topics.sh --describe --zookeeper master:2181 --topic t_test


isr表示現在處於同步狀態的broker,如果殺掉某一臺伺服器,例如殺掉leader:0的伺服器,也就是0號伺服器:master中的kafka程序。執行kill -9 pid

kafka會立即進行容災處理,同時,生產和消費並不受影響。
再次恢復kafka程序,三臺伺服器又會立即同步。

kafka java demo

推薦使用maven來構建專案,如果沒有使用maven,匯入kafka壓縮包裡的libs中的jar包即可

生產者樣例

配置說明

  • bootstrap.servers: kafka的地址。
  • acks:訊息的確認機制,預設值是0。
  • acks=0 :如果設定為0,生產者不會等待kafka的響應。
  • acks=1 :這個配置意味著kafka會把這條訊息寫到本地日誌檔案中,但是不會等待叢集中其他機器的成功響應。
  • acks=all :這個配置意味著leader會等待所有的follower同步完成。這個確保訊息不會丟失,除非kafka叢集中所有機器掛掉。這是最強的可用性保證。
  • retries:配置為大於0的值的話,客戶端會在訊息傳送失敗時重新發送。
  • batch.size:當多條訊息需要傳送到同一個分割槽時,生產者會嘗試合併網路請求。這會提高client和生產者的效率。
  • key.serializer: 鍵序列化,預設org.apache.kafka.common.serialization.StringDeserializer。
  • value.deserializer:值序列化,預設org.apache.kafka.common.serialization.StringDeserializer。

新增完配置之後,producer就可以生產資料,使用producer.send()方法。傳入的引數為topic,key,value。如果topic在kafka叢集中還沒有被建立,那麼便會自動建立一個新的topic(新建的topic各個屬性我不知道)

package cn.colony.cloudhadoop.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class ProducerDemo {

    public static void main(String[] args) throws InterruptedException{
        Properties props = new Properties();//配置項
        props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");//使用新的API指定kafka叢集位置
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        String messageStr = null;
        for (int i = 1;i<1000;i++){
            Thread.sleep(50);
            messageStr = "hello, this is "+i+"th message";
            producer.send(new ProducerRecord<String, String>("t_topic","Message",messageStr));
        }
        producer.close();
    }
}

消費者樣例

配置說明

  • bootstrap.servers: kafka的地址。
  • group.id:組名 不同組名可以重複消費。例如你先使用了組名A消費了kafka的1000條資料,但是你還想再次進行消費這1000條資料,並且不想重新去產生,那麼這裡你只需要更改組名就可以重複消費了。
  • enable.auto.commit:是否自動提交,預設為true。
  • auto.commit.interval.ms: 從poll(拉)的回話處理時長。
  • session.timeout.ms:超時時間。
  • max.poll.records:一次最大拉取的條數。
  • auto.offset.reset:消費規則,預設earliest 。
  • earliest: 當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 。
  • latest: 當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分割槽下的資料 。
  • none: topic各分割槽都存在已提交的offset時,從offset後開始消費;只要有一個分割槽不存在已提交的offset,則丟擲異常。
  • key.serializer: 鍵序列化,預設org.apache.kafka.common.serialization.StringDeserializer。
  • value.deserializer:值序列化,預設org.apache.kafka.common.serialization.StringDeserializer。

首先訂閱一個topic,consumer就可以開始消費資料。

package cn.colony.cloudhadoop.kafka;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.serialization.StringDeserializer;

public class ConsumerDemo implements Runnable{
    private final KafkaConsumer<String, String> consumer;
    private ConsumerRecords<String, String> msgList;
    private final String topic;
    private static final String GROUDID = "groupA";

    public ConsumerDemo(String topicName){
        Properties props = new Properties();
        props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
        props.put("group.id", GROUDID);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<String, String>(props);
        this.topic = topicName;
        this.consumer.subscribe(Arrays.asList(topic));
    }

    @Override
    public void run(){
        int messageNum = 1;
        try{
            for (;;){
                msgList = consumer.poll(500);
                if (msgList!=null && msgList.count()>0){
                    for (ConsumerRecord<String, String> record : msgList){
                        if (messageNum % 50 ==0){
                            System.out.println(messageNum+"=receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
                        }
                        if (messageNum % 1000 == 0)
                            break;
                        messageNum++;
                    }
                }
                else{
                    Thread.sleep(1000);
                }
            }
        }
        catch (InterruptedException e){
            e.printStackTrace();
        }
        finally{
            consumer.close();
        }
    }

    public static void main(String[] args){
        ConsumerDemo demo = new ConsumerDemo("t_topic");
        Thread thread = new Thread(demo);
        thread.start();
    }
}

執行說明

在eclipse中使用兩個控制檯檢視輸出,由於先前的配置,可以在本地通過程式碼來監測雲伺服器叢集中的執行情況。生產者生產出的訊息可以被消費者消費。
兩個控制檯分別對應不同Java程式輸出的方法點這裡