1. 程式人生 > >java自定義物件傳送kafka

java自定義物件傳送kafka

寫了一個小的專案,自定義java物件,傳送到kafka

kafka安裝搭建這裡就不在描述了,解壓簡單配置即可

直接進入正題吧

一. 自定義java物件,並實現序列化,省略get,set方法
public class Document implements Serializable {
    private String title;
    private String content;
    private String id;
    private String date;
    private long updatetime;

    public byte[] toBytes(){
        ByteArrayOutputStream bo = new ByteArrayOutputStream();
        ObjectOutputStream oos = null;
        try {
            oos = new ObjectOutputStream(bo);
            oos.writeObject(this);
            oos.flush();
            oos.close();
            bo.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return bo.toByteArray();
    }

    public Document toDocument(byte[] bytes){
        Document document = null;
        try {
            ByteArrayInputStream bis = new ByteArrayInputStream (bytes);
            ObjectInputStream ois = new ObjectInputStream (bis);
            document = (Document) ois.readObject();
            ois.close();
            bis.close();
        } catch (IOException ex) {
            ex.printStackTrace();
        } catch (ClassNotFoundException ex) {
            ex.printStackTrace();
        }
        return document;
    }
}

二. 自定義Encoder

import java.util.Map;

import org.apache.kafka.common.serialization.Serializer;

import com.thunisoft.data.domain.Document;

public class DocumentEncoder implements Serializer<Document> {

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public byte[] serialize(String s, Document document) {
        return document.toBytes();
    }

    @Override
    public void close() {

    }
}

三. producer實現
import java.io.IOException;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.stereotype.Service;

import com.thunisoft.data.domain.Document;
import com.thunisoft.data.fy.api.kafka.DocumentProducer;
import com.thunisoft.data.fy.api.kafka.domain.DocumentEncoder;
import com.thunisoft.data.fy.constant.Constants;

public class DocumentProducer {
    private static Properties props;
    private static KafkaProducer<String, Document> producer;

    static {
        if (props == null){
            props = new Properties();

            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_METDATA_BROKERS);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DocumentEncoder.class.getName());
            //自定義分割槽
//            props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DocumentPartitioner.class.getName());
        }
        producer = new KafkaProducer<String, Document>(props);
    }

    public void produce(Document document) throws IOException {
        producer.send(new ProducerRecord<String, Document>(Constants.TOPIC, document));
    }
}

四. 自定義Decoder
import com.bigdata.frame.data.Document;
import kafka.serializer.Decoder;

public class DocumentDecoder implements Decoder<Document>{

    @Override
    public Document fromBytes(byte[] bytes) {
        Document document = new Document();
        return document.toDocument(bytes);
    }
}

五. 編寫Consumer
import com.bigdata.frame.constant.Constants;
import com.bigdata.frame.kafka.DocumentConsumer;
import com.bigdata.frame.kafka.domain.DocumentDecoder;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.Properties;

public class DocumentConsumer {
    private static Properties props;
    private static ConsumerConnector consumer;
    static{
        if(props == null){
            props = new Properties();
            //zookeeper 配置
            props.put("zookeeper.connect", Constants.ZOOKEERER_CONNECT);

            //group 代表一個消費組
            props.put("group.id", Constants.KAFKA_GROUP_ID);

            //指定客戶端連線zookeeper的最大超時時間
            props.put("zookeeper.connection.timeout.ms", Constants.ZOOLEEPER_CONNECT_SESSION_TOMEOUT);
            //rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms
            //	連線zk的session超時時間
            props.put("zookeeper.session.timeout.ms", Constants.ZOOKEEPER_SESSION_TIMEOUT);
            props.put("zookeeper.sync.time.ms", "200");
            props.put("auto.commit.interval.ms", "1000");
            props.put("auto.offset.reset", "smallest");

            props.put("rebalance.max.retries", "5");
            props.put("rebalance.backoff.ms", "1200");
            //序列化類
            props.put("serializer.class", DocumentDecoder.class.getName());
        }
    }

    public ConsumerConnector getConsumer(){
        if(consumer == null){
            consumer = consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        }
        return consumer;
    }

}
六. 也可自定義partitioner規則
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class DocumentPartitioner implements Partitioner {

    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}


七, 測試程式碼
public static void  main(String[] args){
        DocumentProducer producer = new DocumentProducer();
        Document documentnt = new Document();
        documentnt.setTitle("測試");
        documentnt.setContent("這是Producer測試");
        producer.produce(documentnt);
    }

public static void  main(String[] args) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, 1);

        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        DocumentDecoder valueDecoder = new DocumentDecoder();
        Map<String, List<KafkaStream<String, Document>>> consumerMap =  consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);

        KafkaStream<String, Document> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<String, Document> it = stream.iterator();
        while (it.hasNext()) {
            Document document = it.next().message();
	    System.out.println(document.toString());
        }
    }