1. 程式人生 > >kafka測試同一個消費組的多個消費者負載均衡例項(整合spring)

kafka測試同一個消費組的多個消費者負載均衡例項(整合spring)

這裡使用的是zookeeper和kafka3臺機器的叢集,這樣能保證如過有一臺機器炸了還能執行,在叢集環境中,要在kafka的 server.properties中配置zookeeper叢集地址等資訊,最重要的是num.partitions=3.這樣一個分割槽就是一個機器,所以當kafka發訊息的時候就會發送到每個機器上。

就是因為這個才踩到一個坑,叢集配置好了,建立topic設定了3個分割槽,2個備份,但是發現就訊息只發到了一個機器上,只有一個機器上有這個topic的log檔案。

雖然說這樣依舊能實現多消費者的負載均衡(那所有消費者都是從收到訊息的這臺機器上拉取訊息),但是叢集設定有一個特點:容錯性;這種情況在這臺機器炸了之後就沒辦法運行了,更別說負載均衡了。

中央配置檔案:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd">
  <bean
    class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="locations">
      <list>
        <value>classpath:config.properties</value>
      </list>
    </property>
  </bean>
  <import resource="kafka/consumer1.xml"/>
  <import resource="kafka/consumer2.xml"/>
  <import resource="kafka/producer.xml"/>
</beans>

消費者1:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd">


  <!-- 定義consumer的引數 -->
  <bean id="consumerFactory1" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg>
      <map>
        <entry key="bootstrap.servers" value="${bootstrap.servers}"/>
        <entry key="group.id" value="1"/>
        <entry key="enable.auto.commit" value="true"/>
        <entry key="auto.commit.interval.ms" value="1000"/>
        <entry key="session.timeout.ms" value="15000"/>
        <entry key="key.deserializer" value="org.apache.kafka.common.serialization.LongDeserializer"/>
        <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
      </map>
    </constructor-arg>
  </bean>

  <!-- 實際執行訊息消費的類 -->
  <bean id="messageListernerConsumerService1" class="cn.wzy.JobConsumer1"/>

  <!-- 消費者容器配置資訊 -->
  <bean id="containerProperties1" class="org.springframework.kafka.listener.config.ContainerProperties">
    <constructor-arg value="test1"/>
    <property name="messageListener" ref="messageListernerConsumerService1"/>
  </bean>

  <bean id="messageListenerContainer1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
        init-method="doStart">
    <constructor-arg ref="consumerFactory1"/>
    <constructor-arg ref="containerProperties1"/>
  </bean>

</beans>

消費者2:一模一樣的配置,只是消費者處理類換了;

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd">


  <!-- 定義consumer的引數 -->
  <bean id="consumerFactory2" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg>
      <map>
        <entry key="bootstrap.servers" value="${bootstrap.servers}"/>
        <entry key="group.id" value="1"/>
        <entry key="enable.auto.commit" value="true"/>
        <entry key="auto.commit.interval.ms" value="1000"/>
        <entry key="session.timeout.ms" value="15000"/>
        <entry key="key.deserializer" value="org.apache.kafka.common.serialization.LongDeserializer"/>
        <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
      </map>
    </constructor-arg>
  </bean>

  <!-- 實際執行訊息消費的類 -->
  <bean id="messageListernerConsumerService2" class="cn.wzy.JobConsumer2"/>

  <!-- 消費者容器配置資訊 -->
  <bean id="containerProperties2" class="org.springframework.kafka.listener.config.ContainerProperties">
    <constructor-arg value="test1"/>
    <property name="messageListener" ref="messageListernerConsumerService2"/>
  </bean>

  <bean id="messageListenerContainer2" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
        init-method="doStart">
    <constructor-arg ref="consumerFactory2"/>
    <constructor-arg ref="containerProperties2"/>
  </bean>

</beans>

處理類則是簡單的輸出收到的訊息:

package cn.wzy;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;

public class JobConsumer1 implements MessageListener<Long, String> {

  public void onMessage(ConsumerRecord<Long, String> record) {
    System.out.println("==JobConsumer1 received:" + record.key() + " : " + record.value());
  }


}
package cn.wzy;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;

public class JobConsumer2 implements MessageListener<Long, String> {

  public void onMessage(ConsumerRecord<Long, String> record) {
    System.out.println("==JobConsumer2 received:" + record.key() + " : " + record.value());
  }


}

生產者:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd">

  <!-- 定義producer的引數 -->
  <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
    <constructor-arg>
      <map>
        <entry key="bootstrap.servers" value="${bootstrap.servers}"/>
        <entry key="group.id" value="0"/>
        <entry key="retries" value="10"/>
        <entry key="batch.size" value="16384"/>
        <entry key="linger.ms" value="1"/>
        <entry key="buffer.memory" value="33554432"/>
        <entry key="key.serializer" value="org.apache.kafka.common.serialization.LongSerializer"/>
        <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
      </map>
    </constructor-arg>
  </bean>
  <!--傳送訊息的監聽器-->
  <bean id="listener" class="cn.wzy.Listener"/>
  <!-- 建立kafkatemplate bean,使用的時候,只需要注入這個bean,即可使用template的send訊息方法 -->
  <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg ref="producerFactory"/>
    <constructor-arg name="autoFlush" value="true"/>
    <property name="defaultTopic" value="test1"/>
    <property name="producerListener" ref="listener"/>
  </bean>

</beans>

bootstrap.servers=192.168.60.131:9092,192.168.60.132:9092,192.168.60.133:9092

測試類:

package cn.wzy;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.util.Random;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:application.xml")
public class SendTest {


  @Autowired
  private KafkaTemplate<Long, String> kafkaTemplate;

  @Test
  public void test() throws InterruptedException {
    for (long i = 1; i < 50; i++) {
      kafkaTemplate.sendDefault(i,"message : hello world");
    }
    Thread.sleep(5000);
  }
}

結果: