1. 程式人生 > >簡單實現kafka資料寫入hbase

簡單實現kafka資料寫入hbase

測試資料格式

19392963501,17816115082,2018-09-18 16:19:44,1431
19392963501,17816115082,2018-09-18 16:19:44,1431
14081946321,13094566759,2018-05-23 09:34:27,0610
13415701165,18939575060,2018-11-23 21:33:23,1031
15590483587,16303009156,2018-08-02 07:38:00,0487

消費者程式碼部分

package KafkaAndHbase;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.TreeSet;

import com.google.common.collect.Table;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;



public class Consumer {
    static Configuration conf = null;
    static {
        conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03");
        conf.set("hbase.zookeeper.property.clientport","2181");
    }
    public static void main(String[] args) throws IOException {

        Properties props = new Properties();
        // 定義kakfa 服務的地址,不需要將所有broker指定上
        props.put("bootstrap.servers", "192.168.147.136:9092");
        //配置從頭消費資料
        // props.put("auto.offset.reset","smallest");
        // 制定consumer group
      props.put("group.id", "ttrt");
        // 是否自動確認offset
        props.put("enable.auto.commit", "true");
        // 自動確認offset的時間間隔
        props.put("auto.commit.interval.ms", "1000");
        // key的序列化類
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // value的序列化類
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 定義consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        // 消費者訂閱的topic, 可同時訂閱多個
      //  consumer.subscribe(Arrays.asList("first", "second","third"));
        consumer.subscribe(Arrays.asList("test02"));
        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);

        while (true) {
            // 讀取資料,讀取超時時間為100ms

            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                    //Connection connection = ConnectionFactory.createConnection(conf);
             //    Table table = connection.getTable(TableName.valueOf("testtable"));

                HTable table=new HTable(conf, "phone");
                // value= 13690470615,18122332823,2018-03-21 06:36:32,0578
                //call_phone call_name callee_phone callee_name  start_time  call_long
                String[] split = record.value().split(",");
                String call_phone =split[0];
                String callee_phone=split[1];
                String start_time= split[2];
                String call_long= split[3];
                String rowkey0 = split[0].substring(7)+start_time.replaceAll("-","").substring(0,6);
                int i = rowkey0.hashCode() % 6;
                String rowkey= i+"_"+rowkey0;
                Put put = new Put(Bytes.toBytes(rowkey0));
                put.add(Bytes.toBytes("info"),Bytes.toBytes("call_phone"),Bytes.toBytes(call_phone));
                put.add(Bytes.toBytes("info"),Bytes.toBytes("callee_phone"),Bytes.toBytes(callee_phone));
                put.add(Bytes.toBytes("info"),Bytes.toBytes("start_time"),Bytes.toBytes(start_time));
                put.add(Bytes.toBytes("info"),Bytes.toBytes("call_long"),Bytes.toBytes(call_long));
                table.put(put);
                table.close();

               // System.out.printf("offset = %d, key = %s, value = %s%n", put);
           }
        }
    }