1. 程式人生 > >Kafka簡單入門與Spring結合實踐

Kafka簡單入門與Spring結合實踐

Kafka簡單入門與Spring結合實踐

一、【安裝部署kafka伺服器環境(centos7.0)】: 

1.【注意】新版的kafka已經內建了一個zookeeper環境

2.【安裝與執行】

可以在kafka官網 http://kafka.apache.org/downloads下載到最新的kafka安裝包,選擇下載二進位制版本的tgz檔案,本章用的是版本2.11_2.0.0,在centos7.0 直接解壓即可

3.【執行命令】:

./zookeeper-server-start.sh ../config/zookeeper.properties &

./kafka-server-start.sh ../config/server.properties &

4.【注意】:

【問題一】:Java端的消費者取不到訊息,生產者訊息也沒傳送成功,java通過kafka-client的API寫的程式碼始終不能跟kafka通訊:java producer的訊息發不出去, java comsumer也收不到任何訊息

【解決辦法】:修改kafka/config/server.properties 中的advertised.listeners 這個值改成自己虛擬機器IP地址

【問題二】:WARN [Consumer clientId=consumer-1, groupId=console-consumer-950] Connection to node -1 could not be established. Broker may not be available.

【解決辦法】:檢視本機宿主機是否可以ping通centos7.0 虛擬機器的IP地址,一般ping不通,虛擬機器的IP地址改變了。

 二、【Java程式 + Spring -Kafka 執行例項】:

參考部落格:https://www.cnblogs.com/hei12138/p/7805475.html

1.【Java程式】 :

  pom.xml 依賴配置:【注意】:其中版本號要與伺服器端版本一致

<dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.0.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
                <version>2.0.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>

 【主題配置】:TopicMain.java

package com.caox.kafka._01_topic;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;

import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * Created by nazi on 2018/8/27.
 * @author nazi
 */
public class TopicMain {

    public static void main(String[] argv) throws Exception {
        //建立topic
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.80.150:9092");
        AdminClient adminClient = AdminClient.create(props);
        ArrayList<NewTopic> topics = new ArrayList<NewTopic>();
        /**
         * NewTopic(String name, int numPartitions, short replicationFactor)
         * 的構造方法來建立了一個名為“topic-test”,分割槽數為1,複製因子為1的Topic.
         */
        NewTopic newTopic = new NewTopic("topic-test3", 1, (short) 1);
        topics.add(newTopic);
        CreateTopicsResult result = adminClient.createTopics(topics);
        try {
            result.all().get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

}

【生產者】:ProducerMain.java

package com.caox.kafka._01_topic;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Created by nazi on 2018/8/27.
 * @author nazi
 */
@Slf4j
public class ProducerMain {

    public static void main(String[] argv) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.80.149:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String,String> producer = new KafkaProducer<String, String>(props);
        for (int i = 0; i < 100; i++){
            producer.send(new ProducerRecord<String, String>("topic-test2", Integer.toString(i), Integer.toString(i)));
        }
        log.info("call SUCCESS");
        producer.close();
    }

}

 【消費者】:ConsumerMain.java

package com.caox.kafka._01_topic;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;

/**
 * Created by nazi on 2018/8/27.
 * @author nazi
 */
@Slf4j
public class ConsumerMain {

    public static void main(String[] argv) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.80.149:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        final KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
        consumer.subscribe(Arrays.asList("topic-test2"),new ConsumerRebalanceListener() {
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            }
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                //將偏移設定到最開始
                consumer.seekToBeginning(collection);
            }
        });
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records){
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }

}

 2.【Spring-Kafka配置程式】 :

【pom.xml】依賴配置:

<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-context</artifactId>
	<version>5.0.6.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-core</artifactId>
	<version>5.0.6.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-beans</artifactId>
	<version>5.0.6.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
	<version>2.1.6.RELEASE</version>
</dependency>

【kafka配置】:KafkaConfig.java:

package com.caox.kafka._02_spring_kafka;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;

import java.util.HashMap;
import java.util.Map;

/**
 * Created by nazi on 2018/8/28.
 * @author nazi
 */
@Configuration
@EnableKafka
public class KafkaConfig {

    private static String BOOTSTRAP_SERVERS_CONFIG = "192.168.80.150:9092";

    /**
     * topic配置
     */
    /******************************************************************************************************************/
    @Bean
    public KafkaAdmin admin() {
        Map<String, Object> configs = new HashMap<String, Object>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS_CONFIG);
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic("foo", 10, (short) 1);
    }
    /******************************************************************************************************************/

    /**
     * 配置生產者Factory及Template
     */
    /******************************************************************************************************************/
    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<Integer,String>(producerConfigs());
    }
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<String,Object>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return props;
    }
    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        return new KafkaTemplate<Integer, String>(producerFactory());
    }
    /******************************************************************************************************************/

    /**
     * 配置ConsumerFactory
     */
    /******************************************************************************************************************/
    @Bean
    public ConcurrentKafkaListenerContainerFactory<Integer,String> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<Integer, String>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer,String> consumerFactory(){
        return new DefaultKafkaConsumerFactory<Integer, String>(consumerConfigs());
    }


    @Bean
    public Map<String,Object> consumerConfigs(){
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS_CONFIG);
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
    /******************************************************************************************************************/

    /**
     * 預設spring-kafka會為每一個監聽方法建立一個執行緒來向kafka伺服器拉取訊息
     */
    @Bean
    public SimpleConsumerListener simpleConsumerListener(){
        return new SimpleConsumerListener();
    }

}

【生產者配置】:ProducerMain.java

package com.caox.kafka._02_spring_kafka;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * Created by nazi on 2018/8/28.
 * @author nazi
 */
public class ProducerMain {

    /**
     * 建立訊息生產者
     * @param argv        引數
     * @throws Exception  異常
     */
    public static void main(String[] argv) throws Exception {
        AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(KafkaConfig.class);
        KafkaTemplate<Integer, String> kafkaTemplate = (KafkaTemplate<Integer, String>) ctx.getBean("kafkaTemplate");
        String data="this is a test message";
        ListenableFuture<SendResult<Integer, String>> send = kafkaTemplate.send("topic-test3", 1, data);
        send.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            public void onFailure(Throwable throwable) {

            }

            public void onSuccess(SendResult<Integer, String> integerStringSendResult) {
                System.out.println("success to receive message !");
            }
        });
    }

}

【消費者配置】:消費者監聽配置 SimpleConsumerListener.java:

package com.caox.kafka._02_spring_kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;

import java.util.concurrent.CountDownLatch;

/**
 * Created by nazi on 2018/8/28.
 * @author nazi
 */
public class SimpleConsumerListener {
    private final static Logger logger = LoggerFactory.getLogger(SimpleConsumerListener.class);
    private final CountDownLatch latch1 = new CountDownLatch(1);

    @KafkaListener(id = "foo", topics = "topic-test3")
//    public void listen(byte[] records) {
//        //do something here
//        this.latch1.countDown();
//    }
    public void listen(ConsumerRecord record) {
        System.out.println("listen : " + " key:"+ record.key() + " value: " + record.value());
    }
}