1. 程式人生 > >kafka API消費資料,指定分割槽消費,分割槽,攔截器

kafka API消費資料,指定分割槽消費,分割槽,攔截器

producer傳送訊息,consumer消費訊息

public class producer1 {
    public static void main(String[] args) {
        Properties prop = new Properties();
        //1.配置kafka節點地址
        prop.put("bootstrap.servers","192.168.232.132:9092");
        //2.傳送訊息是否應答
        prop.put("acks","all");
        //3.配置傳送訊息失敗重試
        prop.put("retries","0");
        //4.配置批量處理訊息大小
        prop.put("batch.size" ,"10241");
        //5.配置批量處理資料延遲
        prop.put("linger.ms","5");
        //6.配置記憶體緩衝大小
        prop.put("buffer.memory","1234321");



        //7.資訊傳送前必須序列化
        prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        //例項化
        KafkaProducer<String,String> prodecer = new KafkaProducer<String,String>(prop);
        for (int i = 0; i <99; i++){
            prodecer.send(new ProducerRecord<String, String>("aa", "hah" + i), new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (metadata!=null) {
                        System.out.println(metadata.topic() + "..." + metadata.offset() + "..." + metadata.partition());
                    }
                }
            });
        }

        prodecer.close();
    }
}
public class Consumer1 {
	public static void main(String[] args) {
		//1.配置消費者屬性
		Properties prop = new Properties();
		
		//配置屬性
		//伺服器地址指定
		prop.put("bootstrap.servers", "192.168.232.132:9092");
		//配置消費者組
		prop.put("group.id", "g1");
		//配置是否自動確認offset
		prop.put("enable.auto.commit", "true");
		//序列化
		prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		
		//2.例項消費者
		final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
		
		//4.釋放資源 執行緒安全
		Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
			
			public void run() {
				if(consumer != null) {
					consumer.close();
				}
			}
		}));
		
		//訂閱訊息主題
		consumer.subscribe(Arrays.asList("aa"));
		
		//3.拉訊息 推push 拉poll
		while(true) {
			ConsumerRecords<String,String> records = consumer.poll(1000);
			//遍歷訊息
			for(ConsumerRecord<String,String> record:records) {
				System.out.println(record.topic() + "------" + record.value());
			}
			
		}
	}
}

設定分割槽與指定分割槽消費

public class Patition1 implements Partitioner{

	//設定
	public void configure(Map<String, ?> configs) {
	}

	//分割槽邏輯
	public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		
		return 1;
	}

	//釋放資源
	public void close() {
	}

}
public class Producer2 {
public static void main(String[] args) {
		// 1.配置生產者屬性(指定多個引數)
		Properties prop = new Properties();

		// 引數配置
		// kafka節點的地址
		prop.put("bootstrap.servers", "192.168.232.132:9092");
		// 傳送訊息是否等待應答
		prop.put("acks", "all");
		// 配置傳送訊息失敗重試
		prop.put("retries", "0");
		// 配置批量處理訊息大小
		prop.put("batch.size", "10241");
		// 配置批量處理資料延遲
		prop.put("linger.ms", "5");
		// 配置記憶體緩衝大小
		prop.put("buffer.memory", "12341235");
		// 訊息在傳送前必須序列化
		prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		prop.put("partitioner.class", "com.itstar.kafka.kafka_producer.Patition1");
		
		
		//2.例項化producer
		KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
		
		//3.傳送訊息
		for(int i = 0;i<99;i++) {
			producer.send(new ProducerRecord<String, String>("yuandan", "hunterhenshuai" + i), new Callback() {
				
				public void onCompletion(RecordMetadata metadata, Exception exception) {
					//如果metadata不為null 拿到當前的資料偏移量與分割槽
					if(metadata != null) {
						System.out.println(metadata.topic() + "----" + metadata.offset() + "----" + metadata.partition());
					}
				}
			});
		}
		
		//4.關閉資源
		producer.close();
	}
}
public class consumer {
    public static void main(String[] args) {
        Properties prop = new Properties();
        //配置節點

        prop.put("bootstrap.servers","192.168.232.132:9092");
        //配置消費者組
        prop.put("group.id","tt1");
        //配置自動獲取確定offset
        prop.put("enable.auto.commit","true");
        //序列化
        prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        //例項化consumer
        final KafkaConsumer<String,String> consume = new KafkaConsumer<String, String>(prop);
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                if (consume!=null){
                    consume.close();
                }
            }
        }));
        TopicPartition pp = new TopicPartition("aa",1);
//這個是指定分割槽消費
//        consume.assign(Arrays.asList(p));
//      指定offset開始讀取
//        consume.seekToBeginning(Arrays.asList(p));
//列印分割槽
        List<PartitionInfo> parlist = consume.partitionsFor("aa");
        for(PartitionInfo p : parlist){
            System.out.println(p.toString());
        }
//消費所有分割槽的,新增到List,然後assign這個List
      List<TopicPartition> list = new ArrayList<TopicPartition>();
      for (PartitionInfo p : parlist){
          TopicPartition top = new TopicPartition("shengdan",p.partition());
        list.add(top);
     }

        consume.assign(Arrays.asList(pp));


        while (true){
            ConsumerRecords<String,String> records = consume.poll(1000);
            for(ConsumerRecord<String,String> record : records){
                System.out.println(record.topic() +"---"+record.value());
            }

        }



    }
}

攔截器

public class TimeInterceptor implements ProducerInterceptor<String, String>{

	//配置資訊
	public void configure(Map<String, ?> configs) {
	}
	//業務邏輯
	public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
		
		return new ProducerRecord<String, String>(
				record.topic(), 
				record.partition(),
				record.key(),
				System.currentTimeMillis() + "-" + record.value());
	}
	//傳送失敗呼叫
	public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
	}
	//關閉資源
	public void close() {
	}
	

在producer裡

	//攔截器
	ArrayList<String> inList = new ArrayList<String>();
	inList.add("com.itstare.kafka.interceptor.TimeInterceptor");
	prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, inList);

就可以使用攔截器,相當於一個過濾的作用