1. 程式人生 > >kafka中生產者和消費者API

kafka中生產者和消費者API

actor 成功 edm icc per class 持久化 spout payment

使用idea實現相關API操作,先要再pom.xml重添加Kafka依賴:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.8.2</artifactId>
            <version>0.8.1</version>
            <exclusions>
                <exclusion>
                    <artifactId>jmxtools</artifactId>
                    <groupId>com.sun.jdmk</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jmxri</artifactId>
                    <groupId>com.sun.jmx</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jms</artifactId>
                    <groupId>javax.jms</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

Kafka生產者API:

 1 package cn.itcast.storm.kafka.simple;
 2 
 3 import kafka.javaapi.producer.Producer;
 4 import kafka.producer.KeyedMessage;
 5 import kafka.producer.ProducerConfig;
 6 
 7 import java.util.Properties;
 8 import java.util.UUID;
 9 
10 /**
11  * 這是一個簡單的Kafka producer代碼
12  * 包含兩個功能:
13 * 1、數據發送 14 * 2、數據按照自定義的partition策略進行發送 15 * 16 * 17 * KafkaSpout的類 18 */ 19 public class KafkaProducerSimple { 20 public static void main(String[] args) { 21 /** 22 * 1、指定當前kafka producer生產的數據的目的地 23 * 創建topic可以輸入以下命令,在kafka集群的任一節點進行創建。 24 * bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic test
25 */ 26 String TOPIC = "orderMq"; 27 /** 28 * 2、讀取配置文件 29 */ 30 Properties props = new Properties(); 31 /* 32 * key.serializer.class默認為serializer.class 33 */ 34 props.put("serializer.class", "kafka.serializer.StringEncoder"); 35 /* 36 * kafka broker對應的主機,格式為host1:port1,host2:port2 37 */ 38 props.put("metadata.broker.list", "kafka01:9092,kafka02:9092,kafka03:9092"); 39 /* 40 * request.required.acks,設置發送數據是否需要服務端的反饋,有三個值0,1,-1 41 * 0,意味著producer永遠不會等待一個來自broker的ack,這就是0.7版本的行為。 42 * 這個選項提供了最低的延遲,但是持久化的保證是最弱的,當server掛掉的時候會丟失一些數據。 43 * 1,意味著在leader replica已經接收到數據後,producer會得到一個ack。 44 * 這個選項提供了更好的持久性,因為在server確認請求成功處理後,client才會返回。 45 * 如果剛寫到leader上,還沒來得及復制leader就掛了,那麽消息才可能會丟失。 46 * -1,意味著在所有的ISR都接收到數據後,producer才得到一個ack。 47 * 這個選項提供了最好的持久性,只要還有一個replica存活,那麽數據就不會丟失 48 */ 49 props.put("request.required.acks", "1"); 50 /* 51 * 可選配置,如果不配置,則使用默認的partitioner partitioner.class 52 * 默認值:kafka.producer.DefaultPartitioner 53 * 用來把消息分到各個partition中,默認行為是對key進行hash。 54 */ 55 props.put("partitioner.class", "cn.itcast.storm.kafka.MyLogPartitioner"); 56 // props.put("partitioner.class", "kafka.producer.DefaultPartitioner"); 57 /** 58 * 3、通過配置文件,創建生產者 59 */ 60 Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props)); 61 /** 62 * 4、通過for循環生產數據 63 */ 64 for (int messageNo = 1; messageNo < 100000; messageNo++) { 65 // String messageStr = new String(messageNo + "註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey," + 66 // "註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發" + 67 // "註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發" + 68 // "註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發" + 69 // "註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發" + 70 // "註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發" + 71 // "註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發" + 72 // "註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發" + 73 // "用來配合自定義的MyLogPartitioner進行數據分發"); 74 75 /** 76 * 5、調用producer的send方法發送數據 77 * 註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發 78 */ 79 producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + "itcast")); 80 } 81 } 82 }

Kafka消費者API:

 1 package cn.itcast.storm.kafka.simple;
 2 
 3 import kafka.consumer.Consumer;
 4 import kafka.consumer.ConsumerConfig;
 5 import kafka.consumer.ConsumerIterator;
 6 import kafka.consumer.KafkaStream;
 7 import kafka.javaapi.consumer.ConsumerConnector;
 8 import kafka.message.MessageAndMetadata;
 9 
10 import java.util.HashMap;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.Properties;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.Executors;
16 
17 public class KafkaConsumerSimple implements Runnable {
18     public String title;
19     public KafkaStream<byte[], byte[]> stream;
20     public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) {
21         this.title = title;
22         this.stream = stream;
23     }
24     @Override
25     public void run() {
26         System.out.println("開始運行 " + title);
27         ConsumerIterator<byte[], byte[]> it = stream.iterator();
28         /**
29          * 不停地從stream讀取新到來的消息,在等待新的消息時,hasNext()會阻塞
30          * 如果調用 `ConsumerConnector#shutdown`,那麽`hasNext`會返回false
31          * */
32         while (it.hasNext()) {
33             MessageAndMetadata<byte[], byte[]> data = it.next();
34             String topic = data.topic();
35             int partition = data.partition();
36             long offset = data.offset();
37             String msg = new String(data.message());
38             System.out.println(String.format(
39                     "Consumer: [%s],  Topic: [%s],  PartitionId: [%d], Offset: [%d], msg: [%s]",
40                     title, topic, partition, offset, msg));
41         }
42         System.out.println(String.format("Consumer: [%s] exiting ...", title));
43     }
44 
45     public static void main(String[] args) throws Exception{
46         Properties props = new Properties();
47         props.put("group.id", "dashujujiagoushi");
48         props.put("zookeeper.connect", "zk01:2181,zk02:2181,zk03:2181");
49         props.put("auto.offset.reset", "largest");
50         props.put("auto.commit.interval.ms", "1000");
51         props.put("partition.assignment.strategy", "roundrobin");
52         ConsumerConfig config = new ConsumerConfig(props);
53         String topic1 = "orderMq";
54         String topic2 = "paymentMq";
55         //只要ConsumerConnector還在的話,consumer會一直等待新消息,不會自己退出
56         ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);
57         //定義一個map
58         Map<String, Integer> topicCountMap = new HashMap<>();
59         topicCountMap.put(topic1, 3);
60         //Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是對應的流
61         Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);
62         //取出 `kafkaTest` 對應的 streams
63         List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic1);
64         //創建一個容量為4的線程池
65         ExecutorService executor = Executors.newFixedThreadPool(3);
66         //創建20個consumer threads
67         for (int i = 0; i < streams.size(); i++)
68             executor.execute(new KafkaConsumerSimple("消費者" + (i + 1), streams.get(i)));
69     }
70 }

kafka自定義patition:

 1 package cn.itcast.storm.kafka;
 2 
 3 import kafka.producer.Partitioner;
 4 import kafka.utils.VerifiableProperties;
 5 import org.apache.log4j.Logger;
 6 
 7 
 8 public class MyLogPartitioner implements Partitioner {
 9     private static Logger logger = Logger.getLogger(MyLogPartitioner.class);
10 
11     public MyLogPartitioner(VerifiableProperties props) {
12     }
13 
14     public int partition(Object obj, int numPartitions) {
15         return Integer.parseInt(obj.toString())%numPartitions;
16 //        return 1;
17     }
18 
19 }

kafka中生產者和消費者API