1. 程式人生 > >Kafka:ZK+Kafka+Spark Streaming集群環境搭建(十三)定義一個avro schema使用comsumer發送avro字符流,producer接受avro字符流並解析

Kafka:ZK+Kafka+Spark Streaming集群環境搭建(十三)定義一個avro schema使用comsumer發送avro字符流,producer接受avro字符流並解析

finall ges records ring ack i++ 一個 lan cde

參考《在Kafka中使用Avro編碼消息:Consumer篇》、《在Kafka中使用Avro編碼消息:Producter篇》

pom.xml

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.1</version>
        </dependency>
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>org.apache.avro</groupId
> <artifactId>avro</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>bijection-avro_2.10</artifactId> <
version>0.9.2</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.7.4</version> </dependency>

需要依賴於avro的包,同時這裏是需要使用kafka api。

在使用 Avro 之前,我們需要先定義模式(schemas)。模式通常使用 JSON 來編寫,我們不需要再定義相關的類,這篇文章中,我們將使用如下的模式:

{
    "fields": [
        { "name": "str1", "type": "string" },
        { "name": "str2", "type": "string" },
        { "name": "int1", "type": "int" }
    ],
    "name": "Iteblog",
    "type": "record"
}

上面的模式中,我們定義了一種 record 類型的對象,名字為 Iteblog,這個對象包含了兩個字符串和一個 int 類型的fields。定義好模式之後,我們可以使用 avro 提供的相應方法來解析這個模式:

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);

這裏的 USER_SCHEMA 變量存儲的就是上面定義好的模式。

解析好模式定義的對象之後,我們需要將這個對象序列化成字節數組,或者將字節數組轉換成對象。Avro 提供的 API 不太易於使用,所以本文使用 twitter 開源的 Bijection 庫來方便地實現這些操作。我們先創建 Injection 對象來講對象轉換成字節數組:

Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

現在我們可以根據之前定義好的模式來創建相關的 Record,並使用 recordInjection 來序列化這個 Record :

GenericData.Record record = new GenericData.Record(schema);
avroRecord.put("str1", "My first string");
avroRecord.put("str2", "My second string");
avroRecord.put("int1", 42);
 
byte[] bytes = recordInjection.apply(record);

Producter實現

有了上面的介紹之後,我們現在就可以在 Kafka 中使用 Avro 來序列化我們需要發送的消息了:

package example.avro;

import java.util.Properties;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;

public class AvroKafkaProducter {
    public static final String USER_SCHEMA = 
             "{" 
            + "\"type\":\"record\"," 
            + "\"name\":\"Iteblog\"," 
            + "\"fields\":[" 
            + "  { \"name\":\"str1\", \"type\":\"string\" }," 
            + "  { \"name\":\"str2\", \"type\":\"string\" },"
            + "  { \"name\":\"int1\", \"type\":\"int\" }" 
            + "]}";
    public static final String TOPIC = "t-testavro";

    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.0.121:9092,192.168.0.122:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(USER_SCHEMA);
        Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

        KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props);
        
        for (int i = 0; i < 1000; i++) {
            GenericData.Record avroRecord = new GenericData.Record(schema);
            avroRecord.put("str1", "Str 1-" + i);
            avroRecord.put("str2", "Str 2-" + i);
            avroRecord.put("int1", i);

            byte[] bytes = recordInjection.apply(avroRecord);

            ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(TOPIC, "" + i, bytes);
            producer.send(record);
            System.out.println(">>>>>>>>>>>>>>>>>>" + i);
        }

        producer.close();
        System.out.println("complete...");
    }
}

因為我們使用到 Avro 和 Bijection 類庫,所有我們需要在 pom.xml 文件裏面引入以下依賴:

<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.8.0</version>
</dependency>
 
<dependency>
  <groupId>com.twitter</groupId>
  <artifactId>bijection-avro_2.10</artifactId>
  <version>0.9.2</version>
</dependency>

從 Kafka 中讀取 Avro 格式的消息

從 Kafka 中讀取 Avro 格式的消息和讀取其他類型的類型一樣,都是創建相關的流,然後叠代:

ConsumerConnector consumer = ...;
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
for (final KafkaStream stream : streams) {
    ....
}

關鍵在於如何將讀出來的 Avro 類型字節數組轉換成我們要的數據。這裏還是使用到我們之前介紹的模式解釋器:

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

上面的 USER_SCHEMA 就是上邊介紹的消息模式,我們創建了一個 recordInjection 對象,這個對象就可以利用剛剛解析好的模式將讀出來的字節數組反序列化成我們寫入的數據:

GenericRecord record = recordInjection.invert(message).get();

然後我們就可以通過下面方法獲取寫入的數據:

record.get("str1")
record.get("str2")
record.get("int1")

Kafka 0.9.x 版本Consumer實現

package example.avro;

import java.util.Collections;
import java.util.Properties;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;

public class AvroKafkaConsumer {
    public static void main(String[] args) {
        Logger logger = LoggerFactory.getLogger("AvroKafkaConsumer");
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.0.121:9092,192.168.0.122:9092");
        props.put("group.id", "testgroup");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", ByteArrayDeserializer.class.getName());

        KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(props);

        consumer.subscribe(Collections.singletonList(AvroKafkaProducter.TOPIC));
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(AvroKafkaProducter.USER_SCHEMA);
        Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

        try {
            while (true) {
                ConsumerRecords<String, byte[]> records = consumer.poll(1000);
                for (ConsumerRecord<String, byte[]> record : records) {
                    GenericRecord genericRecord = recordInjection.invert(record.value()).get();
                    String info = String.format(String.format("topic = %s, partition = %s, offset = %d, customer = %s,country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), genericRecord.get("str1")));
                    logger.info(info);
                }
            }
        } finally {
            consumer.close();
        }
    }

}

測試:

producer:

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.1
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : a7a17cdec9eaa6c5
>>>>>>>>>>>>>>>>>>0
>>>>>>>>>>>>>>>>>>1
>>>>>>>>>>>>>>>>>>2
>>>>>>>>>>>>>>>>>>3
>>>>>>>>>>>>>>>>>>4
>>>>>>>>>>>>>>>>>>5
>>>>>>>>>>>>>>>>>>6
>>>>>>>>>>>>>>>>>>7
>>>>>>>>>>>>>>>>>>8
>>>>>>>>>>>>>>>>>>9
>>>>>>>>>>>>>>>>>>10
...
>>>>>>>>>>>>>>>>>>997
>>>>>>>>>>>>>>>>>>998
>>>>>>>>>>>>>>>>>>999
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
complete...

consumer:

[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4321, customer = 165,country = Str 1-165
[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4322, customer = 166,country = Str 1-166
[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4323, customer = 167,country = Str 1-167
[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4324, customer = 168,country = Str 1-168
[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4325, customer = 169,country = Str 1-169
[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4326, customer = 170,country = Str 1-170
[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4327, customer = 171,country = Str 1-171

Kafka:ZK+Kafka+Spark Streaming集群環境搭建(十三)定義一個avro schema使用comsumer發送avro字符流,producer接受avro字符流並解析