java自定義物件傳送kafka
阿新 • • 發佈:2019-01-07
寫了一個小的專案,自定義java物件,傳送到kafka
kafka安裝搭建這裡就不在描述了,解壓簡單配置即可
直接進入正題吧
一. 自定義java物件,並實現序列化,省略get,set方法
public class Document implements Serializable { private String title; private String content; private String id; private String date; private long updatetime; public byte[] toBytes(){ ByteArrayOutputStream bo = new ByteArrayOutputStream(); ObjectOutputStream oos = null; try { oos = new ObjectOutputStream(bo); oos.writeObject(this); oos.flush(); oos.close(); bo.close(); } catch (IOException e) { e.printStackTrace(); } return bo.toByteArray(); } public Document toDocument(byte[] bytes){ Document document = null; try { ByteArrayInputStream bis = new ByteArrayInputStream (bytes); ObjectInputStream ois = new ObjectInputStream (bis); document = (Document) ois.readObject(); ois.close(); bis.close(); } catch (IOException ex) { ex.printStackTrace(); } catch (ClassNotFoundException ex) { ex.printStackTrace(); } return document; } }
二. 自定義Encoder
import java.util.Map; import org.apache.kafka.common.serialization.Serializer; import com.thunisoft.data.domain.Document; public class DocumentEncoder implements Serializer<Document> { @Override public void configure(Map<String, ?> map, boolean b) { } @Override public byte[] serialize(String s, Document document) { return document.toBytes(); } @Override public void close() { } }
三. producer實現
import java.io.IOException; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.stereotype.Service; import com.thunisoft.data.domain.Document; import com.thunisoft.data.fy.api.kafka.DocumentProducer; import com.thunisoft.data.fy.api.kafka.domain.DocumentEncoder; import com.thunisoft.data.fy.constant.Constants; public class DocumentProducer { private static Properties props; private static KafkaProducer<String, Document> producer; static { if (props == null){ props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_METDATA_BROKERS); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DocumentEncoder.class.getName()); //自定義分割槽 // props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DocumentPartitioner.class.getName()); } producer = new KafkaProducer<String, Document>(props); } public void produce(Document document) throws IOException { producer.send(new ProducerRecord<String, Document>(Constants.TOPIC, document)); } }
四. 自定義Decoder
import com.bigdata.frame.data.Document;
import kafka.serializer.Decoder;
public class DocumentDecoder implements Decoder<Document>{
@Override
public Document fromBytes(byte[] bytes) {
Document document = new Document();
return document.toDocument(bytes);
}
}
五. 編寫Consumer
import com.bigdata.frame.constant.Constants;
import com.bigdata.frame.kafka.DocumentConsumer;
import com.bigdata.frame.kafka.domain.DocumentDecoder;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.Properties;
public class DocumentConsumer {
private static Properties props;
private static ConsumerConnector consumer;
static{
if(props == null){
props = new Properties();
//zookeeper 配置
props.put("zookeeper.connect", Constants.ZOOKEERER_CONNECT);
//group 代表一個消費組
props.put("group.id", Constants.KAFKA_GROUP_ID);
//指定客戶端連線zookeeper的最大超時時間
props.put("zookeeper.connection.timeout.ms", Constants.ZOOLEEPER_CONNECT_SESSION_TOMEOUT);
//rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms
// 連線zk的session超時時間
props.put("zookeeper.session.timeout.ms", Constants.ZOOKEEPER_SESSION_TIMEOUT);
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
props.put("rebalance.max.retries", "5");
props.put("rebalance.backoff.ms", "1200");
//序列化類
props.put("serializer.class", DocumentDecoder.class.getName());
}
}
public ConsumerConnector getConsumer(){
if(consumer == null){
consumer = consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
}
return consumer;
}
}
六. 也可自定義partitioner規則
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class DocumentPartitioner implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
七, 測試程式碼
public static void main(String[] args){
DocumentProducer producer = new DocumentProducer();
Document documentnt = new Document();
documentnt.setTitle("測試");
documentnt.setContent("這是Producer測試");
producer.produce(documentnt);
}
public static void main(String[] args) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
DocumentDecoder valueDecoder = new DocumentDecoder();
Map<String, List<KafkaStream<String, Document>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
KafkaStream<String, Document> stream = consumerMap.get(topic).get(0);
ConsumerIterator<String, Document> it = stream.iterator();
while (it.hasNext()) {
Document document = it.next().message();
System.out.println(document.toString());
}
}