1. 程式人生 > >HBase的JavaAPI操作 與

HBase的JavaAPI操作 與

複製程式碼
1 package hbase;
2
3 import org.apache.hadoop.conf.Configuration;
4 import org.apache.hadoop.hbase.HBaseConfiguration;
5 import org.apache.hadoop.hbase.HColumnDescriptor;
6 import org.apache.hadoop.hbase.HTableDescriptor;
7 import org.apache.hadoop.hbase.client.Get;
8 import org.apache.hadoop.hbase.client.HBaseAdmin;
9 import org.apache.hadoop.hbase.client.HTable;
10 import org.apache.hadoop.hbase.client.Put;
11 import org.apache.hadoop.hbase.client.Result;
12 import org.apache.hadoop.hbase.client.ResultScanner;
13 import org.apache.hadoop.hbase.client.Scan;
14
15 /**
16 * 要先將HBase相關jar包新增進去!!!
17 *
18 * 建立表、刪除表 (使用HBaseAdmin)
19 *
20 * 插入記錄、查詢一條記錄、遍歷所有記錄 (使用HTable)
21 *
22 * @author ahu_lichang
23 *
24 /
25 public class HBaseApp {
26
27 private static final String TABLE_NAME = “table1”;
28 private static final String FAMILY_NAME = “family1”;
29 private static final String ROW_KEY = “rowkey1”;
30
31 public static void main(String[] args) throws Exception {
32 Configuration conf = HBaseConfiguration.create();
33 /


34 * hbase操作必備
35 /
36 conf.set(“hbase.rootdir”, “hdfs://hadoop0:9000/hbase”);
37 // 使用eclipse時必須新增這個,否則無法定位
38 conf.set(“hbase.zookeeper.quorum”, “hadoop0”);
39 /

40 * 建立表
41 /
42 HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);
43 /

44 * if (!hBaseAdmin.tableExists(TABLE_NAME)) { HTableDescriptor
45 * hTableDescriptor = new HTableDescriptor(TABLE_NAME);
46 * HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(
47 * FAMILY_NAME); hTableDescriptor.addFamily(hColumnDescriptor);
48 * hBaseAdmin.createTable(hTableDescriptor); }
49 /
50
51 /

52 * 新增一條記錄
53 /
54 HTable hTable = new HTable(conf, TABLE_NAME);
55 /

56 * Put put = new Put(ROW_KEY.getBytes());
57 * put.add(FAMILY_NAME.getBytes(), “age”.getBytes(), “25”.getBytes());
58 * hTable.put(put);
59 /
60
61 /

62 * 查詢一條記錄
63 /
64
65 /

66 * Get get = new Get(ROW_KEY.getBytes()); Result result =
67 * hTable.get(get); byte[] value = result
68 * .getValue(FAMILY_NAME.getBytes(), “age”.getBytes()); //
69 * keyvalues={rowkey1/family1:age/1491571143625/Put/vlen=2/ts=0} 25
70 * System.out.println(result + “\t” + new String(value));
71 /
72
73 /

74 * 遍歷所有記錄
75 /
76 Scan scan = new Scan();
77 ResultScanner resultScanner = hTable.getScanner(scan);
78 for (Result result : resultScanner) {
79 byte[] value = result.getValue(FAMILY_NAME.getBytes(),
80 “age”.getBytes());
81 System.out.println(result + “\t” + new String(value));
82 }
83
84 hTable.close();
85 /

86 * 刪除表
87 /
88 /

89 * hBaseAdmin.disableTable(TABLE_NAME);
90 * hBaseAdmin.deleteTable(TABLE_NAME);
91 */
92 }
93
94 }

複製程式碼

HBase結合MapReduce批量匯入

複製程式碼
1 package hbase;
2
3 import java.text.SimpleDateFormat;
4 import java.util.Date;
5
6 import org.apache.hadoop.conf.Configuration;
7 import org.apache.hadoop.hbase.client.Put;
8 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
9 import org.apache.hadoop.hbase.mapreduce.TableReducer;
10 import org.apache.hadoop.hbase.util.Bytes;
11 import org.apache.hadoop.io.LongWritable;
12 import org.apache.hadoop.io.NullWritable;
13 import org.apache.hadoop.io.Text;
14 import org.apache.hadoop.mapreduce.Counter;
15 import org.apache.hadoop.mapreduce.Job;
16 import org.apache.hadoop.mapreduce.Mapper;
17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
18 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
19
20 public class BatchImport {
21 static class BatchImportMapper extends
22 Mapper<LongWritable, Text, LongWritable, Text> {
23 SimpleDateFormat dateformat1 = new SimpleDateFormat(“yyyyMMddHHmmss”);
24 Text v2 = new Text();
25
26 protected void map(LongWritable key, Text value, Context context)
27 throws java.io.IOException, InterruptedException {
28 final String[] splited = value.toString().split("\t");
29 try {
30 final Date date = new Date(Long.parseLong(splited[0].trim()));
31 final String dateFormat = dateformat1.format(date);
32 String rowKey = splited[1] + “:” + dateFormat;//設定行鍵:手機號碼+日期時間
33 v2.set(rowKey + “\t” + value.toString());
34 context.write(key, v2);
35 } catch (NumberFormatException e) {
36 final Counter counter = context.getCounter(“BatchImport”,
37 “ErrorFormat”);
38 counter.increment(1L);
39 System.out.println(“出錯了” + splited[0] + " " + e.getMessage());
40 }
41 };
42 }
43
44 static class BatchImportReducer extends
45 TableReducer<LongWritable, Text, NullWritable> {
46 protected void reduce(LongWritable key,
47 java.lang.Iterable values, Context context)
48 throws java.io.IOException, InterruptedException {
49 for (Text text : values) {
50 final String[] splited = text.toString().split("\t");
51
52 final Put put = new Put(Bytes.toBytes(splited[0]));//第一列行鍵
53 put.add(Bytes.toBytes(“cf”), Bytes.toBytes(“date”),
54 Bytes.toBytes(splited[1]));//第二列日期
55 put.add(Bytes.toBytes(“cf”), Bytes.toBytes(“msisdn”),
56 Bytes.toBytes(splited[2]));//第三列手機號碼
57 // 省略其他欄位,呼叫put.add(…)即可
58 context.write(NullWritable.get(), put);
59 }
60 };
61 }
62
63 public static void main(String[] args) throws Exception {
64 final Configuration configuration = new Configuration();
65 // 設定zookeeper
66 configuration.set(“hbase.zookeeper.quorum”, “hadoop0”);
67 // 設定hbase表名稱
68 configuration.set(TableOutputFormat.OUTPUT_TABLE, “wlan_log”);//先在shell下建立一個表:create ‘wlan_log’,‘cf’
69 // 將該值改大,防止hbase超時退出
70 configuration.set(“dfs.socket.timeout”, “180000”);
71
72 final Job job = new Job(configuration, “HBaseBatchImport”);
73
74 job.setMapperClass(BatchImportMapper.class);
75 job.setReducerClass(BatchImportReducer.class);
76 // 設定map的輸出,不設定reduce的輸出型別
77 job.setMapOutputKeyClass(LongWritable.class);
78 job.setMapOutputValueClass(Text.class);
79
80 job.setInputFormatClass(TextInputFormat.class);
81 // 不再設定輸出路徑,而是設定輸出格式型別
82 job.setOutputFormatClass(TableOutputFormat.class);
83
84 FileInputFormat.setInputPaths(job, “hdfs://hadoop0:9000/input”);//將手機上網日誌檔案上傳到HDFS中的input檔案中
85
86 job.waitForCompletion(true);
87 }
88 }

複製程式碼
在eclipse中將上面程式碼執行成功後,就可以去HBase shell中檢視結果:

在這裡插入圖片描述