1. 程式人生 > >kafka入門一:安裝與使用

kafka入門一:安裝與使用

Kafak安裝與使用

一、前言

    kafka是Apache平臺下的一種分散式釋出/訂閱訊息系統,也就是訊息中介軟體。在之前我使用的是ActiveMQ,初次接觸Kafka,先從最基本的路數走起,後續再進行深入的學習。

二、Kafka下載與安裝

    Kafka版本:1.0.0

    2.1 下載

    下載地址:

        https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.12-1.0.0.tgz

    

        以上為下載地址

    2.2 linux下載命令,拿其中之一舉例

$> wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz

    2.3 單機安裝

        解壓、安裝

$> mv kafka_2.12-1.0.0.tgz /usr/local
$> tar -zxf kafka_2.12-1.0.0.tgz

       Kafka強依賴於ZooKeeper,啟動Kafka必須先啟動ZooKeeper。單機時在configure/server.properties中的預設配置為zookeeper.connect=localhost:2181,暫不用作修改,直連本機ZooKeeper即可

        啟動服務(後臺)

$> ./bin/kafka-server-start.sh ./config/server.properties &

        停止時,先停止Kafka,再停止ZooKeeper

$> ./bin/kafka-server-stop.sh
$> cd /usr/local/zookeeper-3.4.11/
$> ./bin/zkServer.sh stop

        測試

            建立topic。建立一個名為“test”的topic,他只有一個分割槽--partition,一個副本--replication-factor

$> ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

            驗證topic是否建立成功

$> ./bin/kafka-topics.sh --list --zookeeper localhost:2181
test

            在Console模式下,啟動producer傳送訊息(ctrl c退出Console模式)

$> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
 > love
 > and
 > peace

            在Console模式下,啟動consumer消費訊息

$> ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
love
and
peace

            在使用Console時,會有如下warning。提示是可以直連borker-list的

          Using the ConsoleConsumer/Producer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

$> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

2.4 叢集部署

2.5 Kafka監控工具——KafkaOffsetMonitor安裝部署

    請自行百度安裝包下載,需要注意一點的是,在jar包中有些js檔案需要科學上網才能訪問,相對應也有本地化修改的版本,我沒有找到,如果你有,請聯絡我(正經臉)!

    安裝部署:

        KafkaOffsetMonitor的所有執行資源已經打包為一個jar檔案,我們可以新建一個單獨的目錄存放monitor檔案。在同目錄下,編寫啟動指令碼:

vim monitorStart.sh

        輸入以下內容

#! /bin/bash
java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \
 com.quantifind.kafka.offsetapp.OffsetGetterWeb \    # 指明執行Web監控的類
 --zk localhost:2181 \                               # 連線的ZooKeeper伺服器地址
 --port 8080 \                                       # 監控器Web執行埠
 --refresh 10.seconds \                              # 頁面資料重新整理時間
 --retain 1.days                                     # 頁面資料保留時間

        更改sh檔案可執行許可權

chmod u+x monitorStart.sh

        後臺啟動監控程式,即可訪問(注意是否該科學上網)

nohup ./monitorStart.sh &

        訪問虛擬機器伺服器地址,我的是192.168.81.129:8080,如下,第一次訪問時為空白,進行一次消費即可


三、Java客戶端

    首先在kafka伺服器上新生成一個topic

$> ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 6 --topic java-topic_test

    Java端訊息生產者

public class JavaKafkaProducer {
    private Logger logger = Logger.getLogger("JavaKafkaProducer");
    /**
     * 設定例項生產訊息的總數
     */
    private static final int MSG_SIZE = 10;
    /**
     * 主題名稱
     */
    public static final String TOPIC = "java-topic_test";
    /**
     * kafka伺服器節點
     */
    private static final String BROKER_LIST = "192.168.81.129:9092";

    private static KafkaProducer<String,String> producer = null;

    static {
        Properties configs = initConfig();
        producer = new KafkaProducer<String, String>(configs);
    }

    // 1.kafka producer引數設定
    private static Properties initConfig() {
        Properties props = new Properties();
        // broker列表
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        // 設定序列化的類
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        /**
         * 0        不等待結果返回
         * 1        等待至少有一個伺服器返回資料接收標識
         * -1或all   表示必須接受到所有的伺服器返回標識,及同步寫入
         */
        props.put("request.required.acks", "0");
        /**
         * 內部發送資料是非同步還是同步
         * sync 同步,預設
         * async非同步
         */
        props.put("producer.type", "async");

        // bootstrap.servers地址,必須指定
        props.put("bootstrap.servers", "192.168.81.129:9092");

        // 設定分割槽類,可以使用自定義分割槽類
//        props.put("partitioner.class", "JavaKafkaProducerPartitioner");

        // 延遲傳送時間
//        props.put("linger.ms","1");

        // 重試次數
        props.put("message.send.max.retries", 0);

        // 非同步提交的時候(async),併發提交的記錄數
        props.put("batch.num.message", 200);

        // 設定緩衝區大小,預設10kb
        props.put("send.buffere.bytes", "102400");
        return props;
    }

    /**
     * 產生一個訊息
     */
    private static String generateMessage() {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 5; i++) {
            sb.append((new Random()).nextInt(20)).append(" ");
        }
        return sb.toString();
    }

    public static void main(String[] args) {
        ProducerRecord<String, String> record = null;
        String message = null;
        try {
            int num = 0;
            for (int i = 0; i<MSG_SIZE; i++) {
                message = generateMessage();
                System.out.println(message);
                record = new ProducerRecord<String, String>(TOPIC,"SCDN", message);
                producer.send(record, new Callback() {
                    public void onCompletion (RecordMetaData recordMetadata, Exception e) {
                        if(e != null) {
                            e.printStackTrace();
                            System.out.println("傳送訊息失敗!");
                        } else {
                            System.out.println("傳送訊息成功!");
                        }
                    }
                );
                if(num++ % 10 == 0) {
                    Thread.sleep(2000);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (producer != null) {
                producer.close();
                producer = null;
            }
        }
    }
}

    自定義分割槽器

public class JavaKafkaProducerPartitioner implements Partitioner {

    private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());

    /**
     * 無參構造
     */
    public JavaKafkaProducerPartitioner() {
        this(new VerifiableProperties());
    }

    /**
     * 建構函式,必須給定
     *
     * @param properties
     */
    public JavaKafkaProducerPartitioner(VerifiableProperties properties) {
        // nothing
    }

    private static int toPositive(int number) {
        return number & 0x7fffffff;
    }
    // 分割槽方法
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        // 除錯使用
        System.out.println("key is " + key);
        System.out.println("value is " + new String(valueBytes)); // 此方法和下面的方法都是列印value的值
        System.out.println("value is " + value);

        return new Random().nextInt(100) % numPartitions; // 返回分割槽
    }

    public void close() {

    }

    public void configure(Map<String, ?> map) {

    }
}

    在kafka伺服器中使用如下命令檢視相應主題下的offset偏移量日誌

$> cd $kafka
$> ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/java-topic_test-0/00000000000000000000.log --print-data-log

    Java端消費者

public class JavaKafkaConsumer {

    static Properties props = new Properties();

    static {
        props.put("bootstrap.servers", "192.168.81.129:9092");
        props.put("group.id", "test");
        props.put("client.id", "test");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    }

    public static void main(String[] args) {
        // 1.初始化消費者
        final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        // 2.訂閱主題,指定一個監聽器,用於在消費者發生平衡操作時回撥響應的業務處理
        consumer.subscribe(Arrays.asList("java-topic_test"), new ConsumerRebalanceListener() {
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                consumer.commitAsync(); // 提交偏移量
            }

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                // 獲取該分割槽下已消費的偏移量
                long commitedOffset = -1;
                for (TopicPartition topicPartition : partitions) {
                    // 獲取該分割槽下已消費的偏移量
                    commitedOffset = consumer.committed(topicPartition).offset();
                    // 重置偏移量到上一次提交的偏移量下一個位置處開始消費
                    consumer.seek(topicPartition, commitedOffset + 1);
                }
            }
        });

        try {
            while (true) {
                // 長輪詢拉取訊息
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String , String> record : records) {
                    System.out.printf("partition = %d, offset = %d,key= %s value = %s%n",
                            record.partition(), record.offset(),
                            record.key(),record.value());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
}

ProducerConsole

121411218
key is CSDN0
value is 121411218
partition is 3
-----------
訊息傳送成功!

ComsumerConsole

partition = 3, offset = 42,key= CSDN0 value = 121411218
成功生產和消費