1. 程式人生 > >Kafka的多工執行緒消費測試

Kafka的多工執行緒消費測試

程式碼:

package com.weichai.kafka;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

/**
 * 消費者的執行緒執行
 * @author lhy
 * @date 2018.10.09
 */
public class Consumer implements Runnable {

	private KafkaStream stream;
	private int threadNumber;
	
	public Consumer(KafkaStream stream, int threadNumber) {
		this.stream = stream;
		this.threadNumber = threadNumber;
	}

    /**
     * 執行緒執行
     */
	@Override
	public void run() {
		// TODO Auto-generated method stub
		ConsumerIterator<byte[], byte[]> it = stream.iterator();
		while(it.hasNext()){
			System.out.println("Thread " + threadNumber + ": " + new String(it.next().message()));
		}
		System.out.println("Shutting down Thread: " + threadNumber);
	}

}
package com.weichai.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

/**
 * Kafka消費者的多執行緒呼叫,提高吞吐效率
 * @author lhy
 * @date 2018.10.09
 */
public class ConsumerThread {

	private final ConsumerConnector consumer;
	private final String topic;
	private ExecutorService executor;
	
	public ConsumerThread(String a_zookeeper, String a_groupId, String a_topic) {
		consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));
		this.topic = a_topic;
	}
	
	public void shutdown() {
		if(consumer !=null){
			consumer.shutdown();
		}
		if(executor !=null){
			executor.shutdown();
		}
		try {
			if(!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)){
				System.out.println("消費者執行緒等待超時,直接退出!");
			}
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			System.out.println("系統異常中斷,直接退出!");
			e.printStackTrace();
		}
	}
	
	public void run(int a_numThreads) {
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(topic, new Integer(a_numThreads));
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        
		executor = Executors.newFixedThreadPool(a_numThreads);
		
		int threadNumber = 0;
		for (final KafkaStream stream : streams) {
			executor.submit(new Consumer(stream, threadNumber));
			threadNumber++;
		}
	}
	private ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
		// TODO Auto-generated method stub
		Properties props = new Properties();
		props.put("zookeeper.connect", a_zookeeper);
		props.put("group.id", a_groupId);
		props.put("zookeeper.session.timeout.ms", "400");
		props.put("zookeeper.sync.time.ms", "200");
		props.put("auto.commit.interval.ms", "1000");
		
		return new ConsumerConfig(props);
	}
	
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		String zooKeeper = "localhost:2181";
		String groupId = "0";
		String topic = "SimpleNode";
		int threads = 5; // 啟動的執行緒數
		
		ConsumerThread thread = new ConsumerThread(zooKeeper, groupId, topic);
		thread.run(threads);
		try {
			Thread.sleep(5000);      //執行緒休眠5秒後終止
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		thread.shutdown();
	}
}

執行截圖:

此處重溫了併發程式設計的內容,使用到了多執行緒框架,開啟5個執行緒吸收Topic,大大提高了Kafka消費Topic的效率.