1. 程式人生 > >Kafka 消息的序列化與反序列化(二)

Kafka 消息的序列化與反序列化(二)

data string next() com pid tor final AR exce

自定義反序列化類:

對於自定義的avro schema結構,需要有自定義的類在consumer時反序列化,反序列化類實例在consumer構造的時候通過參數傳入

public class AvroWithSchemaSpecificDeser<T,E> implements Deserializer<T> {
    private Class<T> typeClass;
    private transient Schema schema;
    private String codecName;
    
    /**
     * Simple constructor 
     * 
     * 
@param pojoClassName The pojo class name to be deserialized * @param codecName The codec used for compression, if null, no compression is applied */ public AvroWithSchemaSpecificDeser(final String pojoClassName, final String codecName) { try { Class<T> payloadClassType = (Class<T>) Class.forName(pojoClassName); typeClass
= payloadClassType; schema = (Schema) payloadClassType.getField("SCHEMA$").get(null); this.codecName = codecName != null ? codecName : "null"; } catch (AvroRuntimeException ex) { throw new IllegalStateException(String.format("Not able to initialize avro object. Details: %s", ex.getMessage()), ex); } } @Override
public T deserialize(String topic, byte[] data) { T pojoObject= null; if(data != null && data.length > 0) { DatumReader<T> datumReader = null; DataFileReader<T> dataFileReader = null; try { SpecificData specificData = new SpecificData(); //用於日期和時間格式的轉換 specificData.addLogicalTypeConversion(new DateConversion()); specificData.addLogicalTypeConversion(new TimeConversion()); specificData.addLogicalTypeConversion(new TimestampConversion()); pojoObject = typeClass.newInstance(); datumReader = new SpecificDatumReader<>(null, schema, specificData); dataFileReader = new DataFileReader<>(new SeekableByteArrayInput(data), datumReader); while (dataFileReader.hasNext()) { pojoObject = dataFileReader.next(pojoObject); } } catch(Exception ex) { SerializationException serex = new SerializationException(String.format("Error when deserializing byte[] to this class (%s) from this topic (%s)",typeClass.toString(), topic), ex); } finally { if(dataFileReader != null) { dataFileReader.close(); } } } return pojoObject; } }

創建consumer對象:

首先在RunnableConsumer中需要創建kafka consumer實例,需要傳入consumer的屬性列表及反序列化對象,在下面創建反序列化實例時,只傳入了pojo_class_name,codec使用了null,也就是沒有使用任何壓縮編碼

Deserializer<K> keyDeserClass = (Deserializer) Class.forName(props.getProperty("key.deserializer")).newInstance();

Class<?> cl = Class.forName(props.getProperty("value.deserializer"));
Constructor<?> cons = cl.getConstructor(Map.class);
Deserializer<V> valueSerClass = (Deserializer)cons.newInstance(consumerConfig.get("pojo_class_name"), null);

consumer = new KafkaConsumer<>(props, keyDeserClass, valueDeserClass);

consumer的props屬性從配置服務器中讀取,其值為類似以下的k-v,其中關鍵的字段為bootstrap.servers,key.deserializer,value.deserializer,group.id和需要反序列化的pojo_class_name

{
	security.protocol=SASL_PLAINTEXT,
	schema.registry.url=http://yourregistryurl.youcompany.com:8080,
	bootstrap.servers=yourbootstrap1.youcompany.com:7788, yourbootstrap2.youcompany.com:7788,
	key.deserializer=org.apache.kafka.common.serialization.LongDeserializer,	
	value.deserializer=com.youcompany.serialization.AvroSchemaSpecificDeser,
	client.id=20353@xxx,
	group.id=yourgroupid,
pojo_class_name=UserSecurityResponse }

第二個參數是key的反序列化對象,這是一個kafka的標準的反序列化類 LongDeserializer

第三個參數是value的反序列化對象,反射創建時,需要讀取pojo_class_name參數

訂閱和消費消息:

在consumer對象創建好後,就可以從線程池中啟動consumer了,訂閱指定的topic,並poll消息,如果有拉取到消息,這將消息notify給監聽者

            consumer.subscribe(topics);
            ConsumerGroup.this.isRunning = true;

            while (true) {
                ConsumerRecords<K,V> records = null;
                try {

                    processCommit(SyncMode.ASYNC);

                    records = consumer.poll(isPolling ? Long.MAX_VALUE : 0);
                    if(records != null && records.count() > 0) {
                        listener.notify(records);
                    }
                } catch(WakeupException wex) {
                    LOGGER.trace("Got a WakeupException. Doing nothing. Exception Details:",wex);
                } 
                isPolling = true;
            }

Kafka 消息的序列化與反序列化(二)