1. 程式人生 > >kafka傳送自定義訊息體(物件、陣列)

kafka傳送自定義訊息體(物件、陣列)

在前面簡單搭建了Windows上的kafka環境,並使用命令列測試可以執行之後(環境請參考:http://blog.csdn.net/u014104286/article/details/75040932)我們會考慮怎麼使用kafka;先試著傳送一個簡單的訊息,傳送成功之後是否需要傳送自定義的訊息類尼?怎麼傳送自定義的訊息類,如果我要傳送一個集合呢?下面我們來一一解決我們的問題。

準備工作:

1.需要搭建並測試成功的kafka環境,並啟動zookeeper和kafka服務。

2.建立一個可用的maven專案

3.新增開發kafkaka的依賴:

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.10.2.0</version>
</dependency>
準備工作完成。

1.首先我們要傳送第一個訊息,訊息型別為String:

Producer傳送訊息類:

public class SimpleProducer {
    public static void main(String[] args) throws Exception{         
          //Assign topicName to string variable
          String topicName = "newtest001";
          // create instance for properties to access producer configs  
          Properties props = new Properties();
          //Assign localhost id
          props.put("bootstrap.servers", "localhost:9092");
          //Set acknowledgements for producer requests.     
          props.put("acks", "all");
          //If the request fails, the producer can automatically retry,
          props.put("retries", 0);
          //Specify buffer size in config
          props.put("batch.size", 16384);
          //Reduce the no of requests less than 0  
          props.put("linger.ms", 1);
          //The buffer.memory controls the total amount of memory available to the producer for buffering.  
          props.put("buffer.memory", 33554432);
          props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
          props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
          Producer<String, String> producer = new KafkaProducer<String, String>(props);
 
          for(int i = 0; i < 10; i++)
             producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));
                   System.out.println("Message sent successfully");
                   producer.close();
       }
}

Consumer接收訊息類:
public class SimpleConsumer {
    public static void main(String[] args) throws Exception {
          //Kafka consumer configuration settings
          String topicName = "newtest001";
          Properties props = new Properties();
 
          props.put("bootstrap.servers", "localhost:9092");
          props.put("group.id", "test");
          props.put("enable.auto.commit", "true");
          props.put("auto.commit.interval.ms", "1000");
          props.put("session.timeout.ms", "30000");
          props.put("key.deserializer",
             "org.apache.kafka.common.serialization.StringDeserializer");
          props.put("value.deserializer",
             "org.apache.kafka.common.serialization.StringDeserializer");
          @SuppressWarnings("resource")
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
 
          //Kafka Consumer subscribes list of topics here.
          consumer.subscribe(Arrays.asList(topicName));
 
          //print the topic name
          System.out.println("Subscribed to topic "+ topicName);
          while (true) {
             ConsumerRecords<String, String> records = consumer.poll(100);
             for (ConsumerRecord<String, String> record : records)
 
             // print the offset,key and value for the consumer records.
             System.out.printf("offset = %d, key = %s, value = %s\n",
                record.offset(), record.key(), record.value());
          }
 
       }
}
以上內容參考:https://www.w3cschool.cn/apache_kafka/apache_kafka_simple_producer_example.html

啟動Consumers類,等待並接收Producer傳送的訊息:執行Producer傳送訊息,Consumers接收到的訊息:


說明能成功傳送和接收訊息。上面傳送的訊息都是字元,我們如果需要傳送一個PerSon這樣JavaBean那怎麼做呢?

我們可以先觀察上面的Producer和Consumers,他們在例項化一個Producer和一個Consumers的時候需要一些引數,其中有: props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");可見是對訊息的key和value的序列化指定類。我們到org.apache.kafka.common.serialization.StringSerializer中可以看見這個類實現了

org.apache.kafka.common.serialization.Deserializer<T>和org.apache.kafka.common.serialization.Serializer<T>


我們也分別實現序列化和反序列化的藉口即可:

DecodeingKafka類:

import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import com.ys.test.SpringBoot.zktest.util.BeanUtils;

public class DecodeingKafka implements Deserializer<Object> {

	@Override
	public void configure(Map<String, ?> configs, boolean isKey) {
	}

	@Override
	public Object deserialize(String topic, byte[] data) {
		return BeanUtils.byte2Obj(data);
	}

	@Override
	public void close() {
		
	}
}

EncodeingKafka類:
import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import com.ys.test.SpringBoot.zktest.util.BeanUtils;
public class EncodeingKafka implements Serializer<Object> {
	@Override
	public void configure(Map configs, boolean isKey) {
		
	}
	@Override
	public byte[] serialize(String topic, Object data) {
		return BeanUtils.bean2Byte(data);
	}
	/*
	 * producer呼叫close()方法是呼叫
	 */
	@Override
	public void close() {
		System.out.println("EncodeingKafka is close");
	}
}

之後我們需要定義JavaBean物件怎麼序列化和反序列化,我們使用ObjectOutputStream和ObjectInputStream實現。(大家也可以考慮更高效的序列化方法)

BeanUtils類:

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

public class BeanUtils {
	private BeanUtils() {}
	/**
	 * 物件序列化為byte陣列
	 * 
	 * @param obj
	 * @return
	 */
	public static byte[] bean2Byte(Object obj) {
		byte[] bb = null;
		try (ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
			 ObjectOutputStream outputStream = new ObjectOutputStream(byteArray)){
			outputStream.writeObject(obj);
			outputStream.flush();
			bb = byteArray.toByteArray();
		} catch (IOException e) {
			e.printStackTrace();
		}
		return bb;
	}
	/**
	 * 位元組陣列轉為Object物件
	 * 
	 * @param bytes
	 * @return
	 */
	public static Object byte2Obj(byte[] bytes) {
		Object readObject = null;
		try (ByteArrayInputStream in = new ByteArrayInputStream(bytes);
			 ObjectInputStream inputStream = new ObjectInputStream(in)){
			 readObject = inputStream.readObject();
		} catch (Exception e) {
			e.printStackTrace();
		} 
		return readObject;
	}
}

PerSon.java:

public class PerSon implements Serializable{
    /**
     *
     */
    private static final long serialVersionUID = 1L;
    private long userid;
    private String name;
    private int age;
    private String addr;
    private String eMail;
    private String userRole;
    private IDCard card;
set... get...

SimpleProducerPerSon.java:訊息生產者:
import java.util.Arrays;
import java.util.List;
//import util.properties packages
import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import com.ys.test.SpringBoot.model.IDCard;
import com.ys.test.SpringBoot.model.PerSon;
public class SimpleProducerPerson {
	public static void main(String[] args) throws Exception{
	      
	      //Assign topicName to string variable
	      String topicName = "mypartition001";
	      // create instance for properties to access producer configs   
	      Properties props = new Properties();
	      //Assign localhost id
	      props.put("bootstrap.servers", "localhost:9092");
	      //Set acknowledgements for producer requests.      
	      props.put("acks", "all");
	      //If the request fails, the producer can automatically retry,
	      props.put("retries", 0);
	      props.put("metadata.fetch.timeout.ms", 30000);
	      //contorller the send method :sync or async default : sync
	      //Specify buffer size in config
	      props.put("batch.size", 16384);
	      //Reduce the no of requests less than 0   
	      props.put("linger.ms", 1);
	      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
	      props.put("buffer.memory", 33554432);
	      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
	      props.put("value.serializer", "com.ys.test.SpringBoot.zktest.encode.EncodeingKafka");
//	      props.put("partitioner.class", "繼承了Partition的類,實現的是根據指定的演算法把訊息推送到指定的分割槽中com.ys.test.SpringBoot.zktest.util.MyPartition");
	      
	      Producer<String, Object> producer = new KafkaProducer<String, Object>(props);
		long startTimes = System.currentTimeMillis();
	      System.out.println();
	      
	      for(int i = 0; i < 2; i++){
	    	  
	    	  final int index = i;
	    	  PerSon perSon = new PerSon();
	    	  perSon.setAge(i);
	    	  perSon.setAddr("My Producer TEST001"+i);
              perSon.setName("MyTest "+i);
              IDCard card = new IDCard();
              card.setCardName("MyTest"+i+"'s idCard");
	    	  card.setCardid(10000000000L);
	    	  perSon.setCard(card);
	    	  
	    	  List<PerSon> asList = Arrays.asList(perSon,perSon);
//	    	  producer.send(new ProducerRecord<String, Object>(topicName,Integer.toString(i),asList));
//	          producer.send(new ProducerRecord<String, Object>(topicName, Integer.toString(i), perSon));
	          producer.send(new ProducerRecord<String, Object>(topicName, Integer.toString(i), asList), new Callback() {
				
				@Override
				public void onCompletion(RecordMetadata metadata, Exception exception) {
					if (metadata != null) {
						System.out.println(index+"  傳送成功:"+"checksum: "+metadata.checksum()+" offset: "+metadata.offset()+" partition: "+metadata.partition()+" topic: "+metadata.topic());
					}
					if (exception != null) {
						System.out.println(index+"異常:"+exception.getMessage());
					}
				}
			});
	      }
	      producer.close();
	   }
}

SimpleConsumersPerSon.java 訊息接收者:
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class SimpleConsumerPerSon {
	public static void main(String[] args) throws Exception {

	      String topicName = "mypartition001";
	      Properties props = new Properties();
	      
	      props.put("bootstrap.servers", "localhost:9092");
	      props.put("group.id", "partitiontest05");
	      props.put("enable.auto.commit", "true"); 
	      props.put("auto.commit.interval.ms", "1000");
	      props.put("session.timeout.ms", "30000");
	      
	      //要傳送自定義物件,需要指定物件的反序列化類
	      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	      props.put("value.deserializer", "com.ys.test.SpringBoot.zktest.encode.DecodeingKafka");
	      
	      //使用String時可以使用系統的反序列化類
//	      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//	 	  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	      @SuppressWarnings("resource")
		KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props);
	      //Kafka Consumer subscribes list of topics here.
	      consumer.subscribe(Arrays.asList(topicName));
	      //print the topic name
	      System.out.println("Subscribed to topic "+ topicName);
	      
	      
	      while (true) {
	    	  ConsumerRecords<String, Object> records = consumer.poll(100);
	         for (ConsumerRecord<String, Object> record : records)
	         // print the offset,key and value for the consumer records.
//	         System.out.printf("offset = %d, key = %s, value = %s\n", 
//	            record.offset(), record.key(), record.value().toString());
	        	 
	        	 System.out.println(record.toString());
	      }
	      
	   }
}

以上的傳送者和接收者他們的key的序列化類還是StringDeserializer,但是value的序列化需要指定為我們自己的。

執行接收者和傳送者,觀察結果:(結果是傳送一個集合和傳送一個物件)

傳送一個物件person接收的結果:

ConsumerRecord(topic = mypartition001, partition = 0, offset = 29, CreateTime = 1502457680160, checksum = 3691507046, serialized key size = 1, serialized value size = 391, key = 0, value = PerSon [userid=0, name=MyTest 0, age=0, addr=My Producer TEST0010, eMail=null, userRole=null, card=IDCard [cardid=10000000000, cardName=MyTest0's idCard]])
ConsumerRecord(topic = mypartition001, partition = 0, offset = 30, CreateTime = 1502457680175, checksum = 1443537499, serialized key size = 1, serialized value size = 391, key = 1, value = PerSon [userid=0, name=MyTest 1, age=1, addr=My Producer TEST0011, eMail=null, userRole=null, card=IDCard [cardid=10000000000, cardName=MyTest1's idCard]])

傳送asList集合的結果:
ConsumerRecord(topic = mypartition001, partition = 0, offset = 31, CreateTime = 1502457689533, checksum = 3469353517, serialized key size = 1, serialized value size = 524, key = 0, value = [PerSon [userid=0, name=MyTest 0, age=0, addr=My Producer TEST0010, eMail=null, userRole=null, card=IDCard [cardid=10000000000, cardName=MyTest0's idCard]], PerSon [userid=0, name=MyTest 0, age=0, addr=My Producer TEST0010, eMail=null, userRole=null, card=IDCard [cardid=10000000000, cardName=MyTest0's idCard]]])
ConsumerRecord(topic = mypartition001, partition = 0, offset = 32, CreateTime = 1502457689552, checksum = 1930168239, serialized key size = 1, serialized value size = 524, key = 1, value = [PerSon [userid=0, name=MyTest 1, age=1, addr=My Producer TEST0011, eMail=null, userRole=null, card=IDCard [cardid=10000000000, cardName=MyTest1's idCard]], PerSon [userid=0, name=MyTest 1, age=1, addr=My Producer TEST0011, eMail=null, userRole=null, card=IDCard [cardid=10000000000, cardName=MyTest1's idCard]]])

這樣我們不管是傳送的是一個物件還是一個集合我們都可以正確傳送和接收了。