1. 程式人生 > >使用 kafka 的java客戶端進行訊息的傳送與接收通訊操作

使用 kafka 的java客戶端進行訊息的傳送與接收通訊操作

kafka的傳送端:

package com.zwz.test;

import kafka.Kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerDemo extends Thread{

    private  KafkaProducer<Integer,String> producer;

    private  String topic;

    public KafkaProducerDemo(String topic){

        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.138:9092" );  //設定kafka的連線和埠
        properties.put( ProducerConfig.CLIENT_ID_CONFIG,"KafkaProducerDemo" );  //
        properties.put( ProducerConfig.ACKS_CONFIG,"-1" );
        properties.put(  ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerSerializer" );
        properties.put(  ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer" );

        producer = new KafkaProducer<Integer,String>( properties );
        this.topic = topic;

    }


    @Override
    public void run(){

        int num = 0;
        while(num<50){

            String message = "message_"+num;
            System.out.println( "begin send message" +  message );
            producer.send( new ProducerRecord<Integer,String>(topic,message) );
            num++;
            try{

                Thread.sleep(1000);

            }catch(Exception e){
                e.printStackTrace();
            }

        }

    }



    public static void main(String[] args) {


        new KafkaProducerDemo("test").start();


    }


}

kafka的接收端:

package com.zwz.test;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerDemo extends Thread {

    private KafkaConsumer kafkaConsumer;

    public KafkaConsumerDemo( String topic ){

        Properties prop = new Properties();
        prop.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.138:9092" );  //
        prop.put(  ConsumerConfig.GROUP_ID_CONFIG,"KafkaConsumerDemo" );    //分組設定
        prop.put(  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true" );       //
        prop.put(  ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000" );   //設定間隔時間
        prop.put(  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerDeserializer"  ); //反序列化的類
        prop.put(  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer" );  //
        prop.put(  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest" );  //設定接收訊息從最前面開始

        kafkaConsumer = new KafkaConsumer(  prop );
        kafkaConsumer.subscribe(  Collections.singletonList( topic ) );

    }


    @Override
    public void run() {

        while(true) {
//            super.run();

            ConsumerRecords<Integer,String> consumerRecord = kafkaConsumer.poll(1000);
            for( ConsumerRecord record :consumerRecord ){
                System.out.println( "message receive:"+record.value() );
            }

        }

    }


    public static void main(String[] args) {

        new KafkaConsumerDemo("test").start();

    }


}

在kafka 的訊息傳送端的 ProducerConfig.ACKS_CONFIG(acks),"-1"   這個設定的引數有下面的幾個作用

 當引數是0 的時候  表示訊息傳送給broker 以後,不需要進行確認(效能較高,但是會出現資料丟失的情況)

 當引數是1 的時候  表示只需要獲得 kafka 叢集中的 leader 節點的確認即可返回  (leader、follower)

 all(-1) 需要ISR 中的所有 Replica 去進行確認(需要叢集當中所有的節點進行確認),最安全的,但是也可能會出現資料丟失的情況

AUTO_OFFSET_RESET_CONFIG

對於新的groupid來說,如果設定為 earliest,那麼他會從最早的訊息開始消費

latest  對於新的groupid來說,直接取已經消費並且已經提交的最大 offset

earliest 對於新的 groupid來說,如果設定為 earliest,那麼他會從最早的訊息開始消費,重置 offset

none