1. 程式人生 > >Kafka訊息序列化和反序列化(上)

Kafka訊息序列化和反序列化(上)

Kafka Producer在傳送訊息時必須配置的引數為:bootstrap.servers、key.serializer、value.serializer。序列化操作是在攔截器(Interceptor)執行之後並且在分配分割槽(partitions)之前執行的。

首先我們通過一段示例程式碼來看下普通情況下Kafka Producer如何編寫:

public class ProducerJavaDemo {
    public static final String brokerList = "192.168.0.2:9092,192.168.0.3:9092,192.168.0.4:9092";
    public static final String topic = "hidden-topic";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("client.id", "hidden-producer-client-id-1");
        properties.put("bootstrap.servers", brokerList);

        Producer<String,String> producer = new KafkaProducer<String,String>(properties);

        while (true) {
            String message = "kafka_message-" + new Date().getTime() + "-edited by hidden.zhu";
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message);
            try {
                Future<RecordMetadata> future =  producer.send(producerRecord, new Callback() {
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        System.out.print(metadata.offset()+"    ");
                        System.out.print(metadata.topic()+"    ");
                        System.out.println(metadata.partition());
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
            try {
                TimeUnit.MILLISECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

這裡採用的客戶端不是0.8.x.x時代的Scala版本,而是Java編寫的新Kafka Producer, 相應的Maven依賴如下:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
</dependency>

上面的程式中使用的是Kafka客戶端自帶的org.apache.kafka.common.serialization.StringSerializer,除了用於String型別的序列化器之外還有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long這幾種型別,它們都實現了org.apache.kafka.common.serialization.Serializer介面,此介面有三種方法:

  1. public void configure(Map<String, ?> configs, boolean isKey):用來配置當前類。
  2. public byte[] serialize(String topic, T data):用來執行序列化。
  3. public void close():用來關閉當前序列化器。一般情況下這個方法都是個空方法,如果實現了此方法,必須確保此方法的冪等性,因為這個方法很可能會被KafkaProducer呼叫多次。

下面我們來看看Kafka中org.apache.kafka.common.serialization.StringSerializer的具體實現,原始碼如下:

public class StringSerializer implements Serializer<String> {
    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue != null && encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    @Override
    public byte[] serialize(String topic, String data) {
        try {
            if (data == null)
                return null;
            else
                return data.getBytes(encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
    }

    @Override
    public void close() {
        // nothing to do
    }
}

首先看下StringSerializer中的configure(Map<String, ?> configs, boolean isKey)方法,這個方法的執行是在建立KafkaProducer例項的時候呼叫的,即執行程式碼Producer<String,String> producer = new KafkaProducer<String,String>(properties)時呼叫,主要用來確定編碼型別,不過一般key.serializer.encoding或serializer.encoding都不會配置,更確切的來說在Kafka Producer Configs列表裡都沒有此項,所以一般情況下encoding的值就是UTF-8。serialize(String topic, String data)方法非常的直觀,就是將String型別的data轉為byte[]型別即可。

如果Kafka自身提供的諸如String、ByteArray、ByteBuffer、Bytes、Double、Integer、Long這些型別的Serializer都不能滿足需求,讀者可以選擇使用如Avro、JSON、Thrift、ProtoBuf或者Protostuff等通用的序列化工具來實現,亦或者是使用自定義型別的Serializer來實現。下面就以一個簡單的例子來介紹下如何自定義型別的使用方法。

假設我們要傳送的訊息都是Company物件,這個Company的定義很簡單,只有名稱name和地址address,具體如下:

public class Company {
    private String name;
    private String address;
    //省略Getter, Setter, Constructor & toString方法
}

接下去我們來實現Company型別的Serializer,即下面程式碼示例中的DemoSerializer。

package com.hidden.client;
public class DemoSerializer implements Serializer<Company> {
    public void configure(Map<String, ?> configs, boolean isKey) {}
    public byte[] serialize(String topic, Company data) {
        if (data == null) {
            return null;
        }
        byte[] name, address;
        try {
            if (data.getName() != null) {
                name = data.getName().getBytes("UTF-8");
            } else {
                name = new byte[0];
            }
            if (data.getAddress() != null) {
                address = data.getAddress().getBytes("UTF-8");
            } else {
                address = new byte[0];
            }
            ByteBuffer buffer = ByteBuffer.allocate(4+4+name.length + address.length);
            buffer.putInt(name.length);
            buffer.put(name);
            buffer.putInt(address.length);
            buffer.put(address);
            return buffer.array();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return new byte[0];
    }
    public void close() {}
}

使用時只需要在Kafka Producer的config中修改value.serializer屬性即可,示例如下:

properties.put("value.serializer", "com.hidden.client.DemoSerializer");
//記得也要將相應的String型別改為Company型別,如:
//Producer<String,Company> producer = new KafkaProducer<String,Company>(properties);
//Company company = new Company();
//company.setName("hidden.cooperation-" + new Date().getTime());
//company.setAddress("Shanghai, China");
//ProducerRecord<String, Company> producerRecord = new ProducerRecord<String, Company>(topic,company);

示例中只修改了value.serializer,而key.serializer和value.serializer沒有什麼區別,如果有真實需要,修改以下也未嘗不可。

--------------------- 本文來自 朱小廝 的CSDN 部落格 ,全文地址請點選:https://blog.csdn.net/u013256816/article/details/78657982?utm_source=copy