簡單實現kafka資料寫入hbase
阿新 • • 發佈:2018-12-15
測試資料格式
19392963501,17816115082,2018-09-18 16:19:44,1431
19392963501,17816115082,2018-09-18 16:19:44,1431
14081946321,13094566759,2018-05-23 09:34:27,0610
13415701165,18939575060,2018-11-23 21:33:23,1031
15590483587,16303009156,2018-08-02 07:38:00,0487
消費者程式碼部分
package KafkaAndHbase; import java.io.IOException; import java.util.Arrays; import java.util.Iterator; import java.util.Properties; import java.util.TreeSet; import com.google.common.collect.Table; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.Bytes; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class Consumer { static Configuration conf = null; static { conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03"); conf.set("hbase.zookeeper.property.clientport","2181"); } public static void main(String[] args) throws IOException { Properties props = new Properties(); // 定義kakfa 服務的地址,不需要將所有broker指定上 props.put("bootstrap.servers", "192.168.147.136:9092"); //配置從頭消費資料 // props.put("auto.offset.reset","smallest"); // 制定consumer group props.put("group.id", "ttrt"); // 是否自動確認offset props.put("enable.auto.commit", "true"); // 自動確認offset的時間間隔 props.put("auto.commit.interval.ms", "1000"); // key的序列化類 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的序列化類 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 定義consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); // 消費者訂閱的topic, 可同時訂閱多個 // consumer.subscribe(Arrays.asList("first", "second","third")); consumer.subscribe(Arrays.asList("test02")); HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); while (true) { // 讀取資料,讀取超時時間為100ms ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { //Connection connection = ConnectionFactory.createConnection(conf); // Table table = connection.getTable(TableName.valueOf("testtable")); HTable table=new HTable(conf, "phone"); // value= 13690470615,18122332823,2018-03-21 06:36:32,0578 //call_phone call_name callee_phone callee_name start_time call_long String[] split = record.value().split(","); String call_phone =split[0]; String callee_phone=split[1]; String start_time= split[2]; String call_long= split[3]; String rowkey0 = split[0].substring(7)+start_time.replaceAll("-","").substring(0,6); int i = rowkey0.hashCode() % 6; String rowkey= i+"_"+rowkey0; Put put = new Put(Bytes.toBytes(rowkey0)); put.add(Bytes.toBytes("info"),Bytes.toBytes("call_phone"),Bytes.toBytes(call_phone)); put.add(Bytes.toBytes("info"),Bytes.toBytes("callee_phone"),Bytes.toBytes(callee_phone)); put.add(Bytes.toBytes("info"),Bytes.toBytes("start_time"),Bytes.toBytes(start_time)); put.add(Bytes.toBytes("info"),Bytes.toBytes("call_long"),Bytes.toBytes(call_long)); table.put(put); table.close(); // System.out.printf("offset = %d, key = %s, value = %s%n", put); } } }