1. 程式人生 > >kafka四 kafka producer的ack機制和kafka叢集

kafka四 kafka producer的ack機制和kafka叢集

一、前幾節講了簡單的資料傳送接收,忘了說一個點,就是producer傳送資料的時候怎麼保證資料成功傳送到kafka伺服器上。

org.apache.kafka.clients.producer.Producer的send()方法有三個過載,其中一個如下:

producer.send(new ProducerRecord<String, Object>(TOPIC, data), new Callback() {
					
					@Override
					public void onCompletion(RecordMetadata metadata, Exception e) {
						if (metadata!=null) {
							//success
							System.out.println("傳送成功");
						}else {
							//fail
							System.err.println("傳送失敗");
						}
						
					}
				});

1、這個方法有一個回撥函式,metadata引數不空則表明資料安全傳送成功,否則傳送失敗。

二、kafka叢集搭建。

2、這個部落格講的是不同伺服器上叢集搭建,如果在同一個伺服器上不同磁碟搭建叢集,該怎麼辦?

     kafka配置不變,zookeeper的配置ip一樣,埠改變即可。

server.1=192.168.131.130:2888:3888
server.2=192.168.131.130:2889:3889
server.3=192.168.131.130:2887:3880

3、如果不想搭建那麼多服務,只需搭建搭建單服務,多節點,那麼只需在單個kafka中將config/server.properties檔案複製幾份,如:config/server-1.properties,config/server-2.properties,

修改配置檔案,只需把broker.id,listeners,log.dir三個配置專案修改,broker.id一定不能一樣,這是節點的唯一標示。

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1
修改配置檔案config/server-2.properties:
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

搭建完成後,實現Java程式碼的生產與消費。

1、生產端程式碼基本不變,bootstrap.servers=192.168.169.128:9092 主要是這個配置項配置叢集即可,主要是消費端,有兩種不同的消費方式,(1):直接去zookeeper服務中取資料,由zookeeper和kafka互動。(2):直接去對準kafka消費資料。

所需jar包可點此連結下載:

三、直接去zookeepe叢集服務中取資料:

初始化配置檔案

	public  Properties getProperties() {
			Properties propertie = new Properties();
			String groupId=type + "_" + prjcode;
			//準備連線kafka的引數,主要是zookeeper地址和topic名稱
			propertie.put("zookeeper.connect", "192.168.39.50:2181,192.168.39.50:2182");
                        //groupId,自己定義
			propertie.put(ConsumerConfig.GROUP_ID_CONFIG ,groupId) ;
			propertie.put("zookeeper.sync.time.ms", "2000");//zookeeper follower可以落後zookeeper leader的最大時間
			propertie.put("auto.offset.reset", "smallest");
	        //zookeeper中沒有初始化offset時,smallest:自動復位offset為smallest的offset
			 //largest:自動復位offset為largest的offset
			 //anything else:向consumer丟擲異常
			propertie.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");//consumer向zookeeper提交offset的頻率,單位是秒
			propertie.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
			propertie.put("topic", "mytopic");
			return propertie;
		}

接收資料

public void get() {
		TOPIC = "mytopic"
		String sqlData="";
		KafkaStream<String, String> kafkastream =null;
		try {
			props = getProperties();
			ConsumerConnector connector = (ConsumerConnector) Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(props));
	        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
	        topicCountMap.put(props.getProperty("topic"), 1);
	        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
	        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
	        Map<String, List<KafkaStream<String, String>>> consumerMap = connector.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
	        List<KafkaStream<String, String>> streamList = consumerMap.get(props.getProperty("topic"));
	        //得到的資料流
	        kafkastream = streamList.get(0);
		} catch (Exception e) {
			e.printStackTrace();
		}
		while (kafkastream.iterator().hasNext()) {
			//獲取資料
			sqlData = kafkastream.iterator().next().message();
		}		
	}

重點:此方式獲取資料,好多方法屬於過時方法,不能用。比如這個方法:kafkastream.iterator().hasNext(),一旦資料流沒有資料,就會停在此處,不會往下執行。說是socket阻塞,反正不好用。就拿接收sql語句入庫來說吧,肯定要批量入庫。但是批量入庫規格訂好後,沒有達規格且資料流沒有資料也要入庫,這個時候這個方法就不行了,根本不會往下執行啊,程式停了。怎麼辦?

還好想到了方法,在專案中跑的是多執行緒,那麼需要再啟動配套的監聽執行緒監聽批量入庫儲存容器的狀態。

四、順帶說一下監聽執行緒。

1、先是一個被監聽執行緒。

import java.util.ArrayList;
import java.util.List;

public class DoThread implements Runnable{
	String name = "";
	public List<String> list=new ArrayList<>();
	public DoThread(String name) {
		this.name = name;
	}
	@Override
	public void run() {
		for (int i = 0; i < 100; i++) {
			list.add(name+"==="+i);
			System.out.println(name+"==="+i);
		}
		if ("wukong".equals(name)) {
			for (int i = 0; i < list.size(); i++) {
				list.remove(i);
				try {
					Thread.sleep(3000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
	}
	
}

2、一個監聽執行緒

public class MonitorThread implements Runnable {
	DoThread name = null;

	public MonitorThread(DoThread name) {
		this.name = name;
	}

	@Override
	public void run() {
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		while (true) {
			System.err.println("====================" + name.name);
			System.err.println(name.name + " " + name.list.size());
			try {
				Thread.sleep(3000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

3、啟動類

public class StartThread {

	public static void main(String[] args) {
		DoThread wukong=new DoThread("wukong");
		DoThread bajie=new DoThread("bajie");
		DoThread shaceng=new DoThread("shaseng");
		new Thread(wukong).start();
		new Thread(bajie).start();
		new Thread(shaceng).start();
		new Thread(new MonitorThread(wukong)).start();
		new Thread(new MonitorThread(bajie)).start();
		new Thread(new MonitorThread(shaceng)).start();
	}

}

這個demo很好的應用到上面的sql監控。當資料流沒有資料的時候,被監聽執行緒會阻塞,監聽執行緒監聽sql儲存容器,只要容器內有資料就執行入庫,此時要注意執行緒安全問題,要加synchronized處理。

五、直接從kafka服務中消費資料,跟之前消費資料一樣,唯一變的就是

propertie.put("bootstrap.servers", ConfParser.bootstrap_servers);這個配置項配置成叢集即可。其他完全一樣。

希望能給大家帶來幫助。