kafka測試同一個消費組的多個消費者負載均衡例項(整合spring)
阿新 • • 發佈:2019-01-11
這裡使用的是zookeeper和kafka3臺機器的叢集,這樣能保證如過有一臺機器炸了還能執行,在叢集環境中,要在kafka的 server.properties中配置zookeeper叢集地址等資訊,最重要的是num.partitions=3.這樣一個分割槽就是一個機器,所以當kafka發訊息的時候就會發送到每個機器上。
就是因為這個才踩到一個坑,叢集配置好了,建立topic設定了3個分割槽,2個備份,但是發現就訊息只發到了一個機器上,只有一個機器上有這個topic的log檔案。
雖然說這樣依舊能實現多消費者的負載均衡(那所有消費者都是從收到訊息的這臺機器上拉取訊息),但是叢集設定有一個特點:容錯性;這種情況在這臺機器炸了之後就沒辦法運行了,更別說負載均衡了。
中央配置檔案:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath:config.properties</value> </list> </property> </bean> <import resource="kafka/consumer1.xml"/> <import resource="kafka/consumer2.xml"/> <import resource="kafka/producer.xml"/> </beans>
消費者1:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <!-- 定義consumer的引數 --> <bean id="consumerFactory1" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <map> <entry key="bootstrap.servers" value="${bootstrap.servers}"/> <entry key="group.id" value="1"/> <entry key="enable.auto.commit" value="true"/> <entry key="auto.commit.interval.ms" value="1000"/> <entry key="session.timeout.ms" value="15000"/> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.LongDeserializer"/> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> </map> </constructor-arg> </bean> <!-- 實際執行訊息消費的類 --> <bean id="messageListernerConsumerService1" class="cn.wzy.JobConsumer1"/> <!-- 消費者容器配置資訊 --> <bean id="containerProperties1" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg value="test1"/> <property name="messageListener" ref="messageListernerConsumerService1"/> </bean> <bean id="messageListenerContainer1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart"> <constructor-arg ref="consumerFactory1"/> <constructor-arg ref="containerProperties1"/> </bean> </beans>
消費者2:一模一樣的配置,只是消費者處理類換了;
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 定義consumer的引數 -->
<bean id="consumerFactory2" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${bootstrap.servers}"/>
<entry key="group.id" value="1"/>
<entry key="enable.auto.commit" value="true"/>
<entry key="auto.commit.interval.ms" value="1000"/>
<entry key="session.timeout.ms" value="15000"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.LongDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
</map>
</constructor-arg>
</bean>
<!-- 實際執行訊息消費的類 -->
<bean id="messageListernerConsumerService2" class="cn.wzy.JobConsumer2"/>
<!-- 消費者容器配置資訊 -->
<bean id="containerProperties2" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="test1"/>
<property name="messageListener" ref="messageListernerConsumerService2"/>
</bean>
<bean id="messageListenerContainer2" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
init-method="doStart">
<constructor-arg ref="consumerFactory2"/>
<constructor-arg ref="containerProperties2"/>
</bean>
</beans>
處理類則是簡單的輸出收到的訊息:
package cn.wzy;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;
public class JobConsumer1 implements MessageListener<Long, String> {
public void onMessage(ConsumerRecord<Long, String> record) {
System.out.println("==JobConsumer1 received:" + record.key() + " : " + record.value());
}
}
package cn.wzy;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;
public class JobConsumer2 implements MessageListener<Long, String> {
public void onMessage(ConsumerRecord<Long, String> record) {
System.out.println("==JobConsumer2 received:" + record.key() + " : " + record.value());
}
}
生產者:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 定義producer的引數 -->
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${bootstrap.servers}"/>
<entry key="group.id" value="0"/>
<entry key="retries" value="10"/>
<entry key="batch.size" value="16384"/>
<entry key="linger.ms" value="1"/>
<entry key="buffer.memory" value="33554432"/>
<entry key="key.serializer" value="org.apache.kafka.common.serialization.LongSerializer"/>
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
</map>
</constructor-arg>
</bean>
<!--傳送訊息的監聽器-->
<bean id="listener" class="cn.wzy.Listener"/>
<!-- 建立kafkatemplate bean,使用的時候,只需要注入這個bean,即可使用template的send訊息方法 -->
<bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory"/>
<constructor-arg name="autoFlush" value="true"/>
<property name="defaultTopic" value="test1"/>
<property name="producerListener" ref="listener"/>
</bean>
</beans>
bootstrap.servers=192.168.60.131:9092,192.168.60.132:9092,192.168.60.133:9092
測試類:
package cn.wzy;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.Random;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:application.xml")
public class SendTest {
@Autowired
private KafkaTemplate<Long, String> kafkaTemplate;
@Test
public void test() throws InterruptedException {
for (long i = 1; i < 50; i++) {
kafkaTemplate.sendDefault(i,"message : hello world");
}
Thread.sleep(5000);
}
}
結果: