1. 程式人生 > >kafka客戶端Producer和Consumer關於自定義訊息序列和反序列

kafka客戶端Producer和Consumer關於自定義訊息序列和反序列

一、背景

    最近在學習kafka相關的知識,正好遇到一個疑問,在寫demo的過程中發現,投遞的資料都是字串型別,那麼就想想在實際應用中應該會有大量的需求投遞自定義資料型別,那麼如何才能投遞自定義資料型別呢?這裡面就涉及到了kafka提供的介面序列化和反序列化的功能。

二、kafka訊息序列化和反序列化

先看個demo,寫個Producer客戶端,根據官方文件,需要先做一些配置,放到Properties中,這裡包括啥bootstrap.servers,等引數,就不詳解了,主要是用到兩個引數:

props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
添加了這麼兩個引數,其實這個是kafka封裝好的,把字串進行序列化,這就是為什麼在ProducerRecord中可以輸入字串進行傳輸。同時,我們在Consumer端也需要新增這樣兩個引數,這兩個引數是進行反序列化的作用的,就是接收到kafka傳遞給你的資料之後,進行反序列化操作。
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

新增以上兩個引數,就是為啥我們是用Producer和Consumer進行生產和接收資料了,並且是以字串的形式,那麼這兩個類到底實現了什麼功能呢,下面看一下...

public class StringSerializer implements Serializer<String> {
    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue != null && encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    @Override
    public byte[] serialize(String topic, String data) {
        try {
            if (data == null)
                return null;
            else
                return data.getBytes(encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
    }

    @Override
    public void close() {
        // nothing to do
    }
}

以上這個類很簡單,就是實現Serializer介面,但是注意這個介面是org.apache.kafka.common.serialization包下面的,不是java下面的,這個介面提供了幾個方法,其中,我們比較關注的是serialize方法,可以看到這個方法有兩個引數,topic,就是我們在客戶端傳遞過來的引數,而data就是對應的資料.

ProducerRecord<String,String> record = new ProducerRecord<String,String>(TOPIC, values);

看到這個方法其實很簡單,就是把字串轉換成byte[]陣列,其實到最後,kafka接收和傳送的都是以位元組資料從形式。不信的話,我們看下反序列化的方法StringDeserializer

public class StringDeserializer implements Deserializer<String> {
    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("deserializer.encoding");
        if (encodingValue != null && encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    @Override
    public String deserialize(String topic, byte[] data) {
        try {
            if (data == null)
                return null;
            else
                return new String(data, encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
        }
    }

    @Override
    public void close() {
        // nothing to do
    }
}

看其中的deserialize方法,把對應的位元組陣列轉換成字串返回,這就是為啥我們能夠傳遞字串,而直接傳遞自定義型別就報錯的原因了。

那麼問題來了,我們怎麼傳遞自定義資料型別呢?

看到上面的這個流程,其實就很明顯了,我們寫一個自定義的類,比如說我們要傳遞一個Person物件,那麼我們就定義個Person物件的序列化和反序列化的類,並且實現Serializer介面,下面繼續看,首先定義個Person類

public class Person implements Serializable{

	private String id ;
	private String name ;
	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	@Override
	public String toString() {
		return "Person [id=" + id + ", name=" + name + "]";
	}
	
}
接下來,我們自定義一個序列化的類:PersonUtilSerializer
public class PersonUtilSerializer  implements Serializer<Person>{

	@Override
	public void configure(Map<String, ?> configs, boolean isKey) {
		
	}

	@Override
	public byte[] serialize(String topic, Person data) {
		
		return JSON.toJSONBytes(data);
	}

	@Override
	public void close() {
		// TODO Auto-generated method stub
		
	}

}
這個類的方法很簡單,就是把Person物件轉換成位元組資料,然後在定義一個反序列化物件PersonUtilDeserializer,具體的程式碼如下:
public class PersonUtilDeserializer implements Deserializer<Person> {

	@Override
	public void configure(Map<String, ?> configs, boolean isKey) {
		// TODO Auto-generated method stub
		
	}

	@Override
	public Person deserialize(String topic, byte[] data) {
		
         	return JSON.parseObject(data, Person.class);
	}

	@Override
	public void close() {
		// TODO Auto-generated method stub
		
	}
程式碼都特別簡單,把物件轉成byte,把byte轉成物件。具體是使用過程就變成了,先看下Proceduer中

        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "com.java.kafka.PersonUtilSerializer");
	    Producer<String, Object> producer = new KafkaProducer<String, Object>(props);
	    int i=0;
	    while(true){
	    	i++;
	    	Person p = new Person();
	    	p.setId(i+"");
	    	p.setName("zhangsan-"+i);
	    	System.out.println(p.toString());
	    	ProducerRecord<String, Object> record = new ProducerRecord<String,Object>(TOPIC, p);
	    	producer.send(record);
	    
	    }
	       producer.close();
	}
是就很簡單,把value.serializer換成我們剛寫的這個類,然後傳遞Person物件進去,那麼在Consumer中,同樣的道理
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "com.java.kafka.PersonUtilDeserializer");

		KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(props);
		consumer.assign(Arrays.asList(TOPIC_PARTITION));
		consumer.seekToBeginning(Arrays.asList(TOPIC_PARTITION));
		while(true){
			ConsumerRecords<String, Object> records = consumer.poll(100);
			System.err.println("print the size of records ,size="+records.count());
			for(ConsumerRecord<String, Object> record:records){
                
                        Person p = (Person) record.value();
		        System.out.printf("offset = %d, value = %s", record.offset(), p.toString());
	                 System.out.println();
	       }
看到上面的把對應的value.deserializer換成對應的反序列化的類即可。此時在執行Consumer程式就能得到你要的輸出了
offset = 2537, value = Person [id={"id":"1"}, name={"name":"zhangsan-3"}]
offset = 2538, value = Person [id={"id":"3"}, name={"name":"zhangsan-4"}]

看到對應的結果了,因此要要採用自定義型別,其實只需要實現Serializer和Deserializer連個藉口即可,在serialize中進行序列化好反序列化操作。

那麼最後還有一個問題,本人在之前的測試過程中,採用了多個客戶端Producer進行生產資料,同時生產的資料型別也是不同的,有int,string和自定義型別,結果在使用Consumer進行接受資料的時候,始終無法得到收到資料,其實這個地方並非無法接收資料,而是在反序列化的過程失敗了,通過測試可以驗證的出來,本人經過下面的測試發現,
@Override
	public Person deserialize(String topic, byte[] data) {
		// TODO Auto-generated method stub
		System.err.println(topic+"-----"+new String(data));
		String str = new String(data);
		Person p = new Person();
	    p.setName(str);
	    return p;
//		return JSON.parseObject(data, Person.class);
	}
對反序列化的方法進行測試列印,因為不同的型別過來,全部要轉車Person型別自然是失敗的,但是如果你手動全部轉成字串型別,即可打印出來,結果如下:

offset = 2534, value = Person [id=null, name={"id":"10","name":"zhangsan-10"}]

offset = 2535, value = Person [id=null, name={"id":"1","name":"zhangsan-1"}]

offset = 2536, value = Person [id=null, name={"id":"2","name":"zhangsan-2"}]

offset = 2537, value = Person [id=null, name={"id":"3","name":"zhangsan-3"}]

offset = 2538, value = Person [id=null, name={"id":"4","name":"zhangsan-4"}]
這裡沒有設定id值,所有的值全部被賦值到names上面,因此可以說明在反序列化的時候失敗了。
因此,如果有不同生成者使用不同的型別進行傳遞資料,此時,可以採用自定義型別,在反序列化的時候進行判斷,轉車對應的 類,保證能夠接收資料,或者Proceduer採用打標的方式,標記自己的型別,方便Consumer進行解析。