大資料儲存---HBase常用介紹(中)
阿新 • • 發佈:2018-12-14
我們這裡主要介紹HBase的API
- 基礎API
- 封裝工具類
基礎API
- 建立表
- 新增資料
- 查詢資料的三種方式
- 掃描查詢
- get方式執行查詢
- 過濾查詢 PS:刪除表請通過shell命令進入客戶端刪除。
package com.hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.SubstringComparator; import org.apache.hadoop.hbase.filter.ValueFilter; import org.apache.hadoop.hbase.protobuf.generated.FilterProtos; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import java.io.IOException; import java.util.List; public class HBaseProcess { //建立表 @Test public void creatHbase() throws Exception { //建立一個連線 Configuration entries = HBaseConfiguration.create(); Connection connection = ConnectionFactory.createConnection(); ////建立一個表需要的管理許可權,他來建立表 Admin admin = connection.getAdmin(); //建立表名 TableName tableName = TableName.valueOf("user"); if (!admin.tableExists(tableName)) { //建立一個表 HTableDescriptor hTable = new HTableDescriptor(tableName); //新增列簇 hTable.addFamily(new HColumnDescriptor("base_info")); System.out.println("------------開始建立表------------"); //建立表 admin.createTable(hTable); } //新增資料 System.out.println("-----------------開始新增資料--------------------"); //獲取表 Table user = connection.getTable(TableName.valueOf("user")); //使用Put類 byte[] rewkey_10s = Bytes.toBytes("rewkey_10"); Put put = new Put(rewkey_10s); byte[] family = Bytes.toBytes("base_info"); byte[] nameField = Bytes.toBytes("username"); byte[] nameValue = Bytes.toBytes("zhangsan"); put.addColumn(family,nameField,nameValue); byte[] sexField = Bytes.toBytes("sex"); byte[] sexValue = Bytes.toBytes("1"); put.addColumn(family, sexField, sexValue); byte[] birField = Bytes.toBytes("birthday"); byte[] birValue = Bytes.toBytes("2014-07-10"); put.addColumn(family, birField, birValue); byte[] addrField = Bytes.toBytes("address"); byte[] addrValue = Bytes.toBytes("北京市"); put.addColumn(family, addrField, addrValue); user.put(put); //獲取資料的三種方式 //方式一, Table user1 = connection.getTable(TableName.valueOf("user")); Get get = new Get(Bytes.toBytes("rewkey_10")); Result result = user1.get(get); List<Cell> cellList = result.listCells(); for (Cell cell : cellList) { System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + "==> " + Bytes.toString(CellUtil.cloneFamily(cell)) + "{" + Bytes.toString(CellUtil.cloneQualifier(cell)) + ":" + Bytes.toString(CellUtil.cloneValue(cell)) + "}"); } user1.close(); //獲取方式2,全表掃描 System.out.println("-----------掃描全表的資料-------------"); Scan scan = new Scan(); /** * 新增資料篩選的範圍 */ scan.setStartRow(Bytes.toBytes("rowkey_10")); scan.setStopRow(Bytes.toBytes("rowkey_22")); Table user2 = connection.getTable(TableName.valueOf("user")); ResultScanner scanner = user2.getScanner(scan); Result result1 = null; while ((result1 = scanner.next())!=null){ List<Cell> cells1 = result1.listCells(); for (Cell cell : cells1) { // 列簇、列名、值、rowkey // 列印rowkey,family,qualifier,value System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + "==> " + Bytes.toString(CellUtil.cloneFamily(cell)) + "{" + Bytes.toString(CellUtil.cloneQualifier(cell)) + ":" + Bytes.toString(CellUtil.cloneValue(cell)) + "}"); } } user1.close(); System.out.println("-------------------查詢住在北京的使用者----------------"); //定義好過濾器 ValueFilter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("北京市")); Scan scan1 = new Scan(); scan1.setFilter(filter); //定義好過濾器可以使用了 Table user3 = connection.getTable(TableName.valueOf("user")); ResultScanner scanner1 = user3.getScanner(scan1); Result result2=null; while ((result2=scanner1.next())!=null){ List<Cell> cells = result2.listCells(); for (Cell cell : cells) { System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + "==> " + Bytes.toString(CellUtil.cloneFamily(cell)) + "{" + Bytes.toString(CellUtil.cloneQualifier(cell)) + ":" + Bytes.toString(CellUtil.cloneValue(cell)) + "}"); } } } }
抽取工具類
package com.hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.util.Bytes; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; public class HBaseUtils { private static Connection connection; //建立表 public static void createTable(String tName, String... familyNames) throws Exception { Connection connection = getConnection(); Admin admin = connection.getAdmin(); //建立表名 TableName tableName = TableName.valueOf(tName); if (!admin.tableExists(tableName)) { //設定表明 HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); for (String familyName : familyNames) { //設定列族 hTableDescriptor.addFamily(new HColumnDescriptor(familyName)); } admin.createTable(hTableDescriptor); } admin.close(); } //新增資料 private static void putData(String tableName,String rowkey,String baseInfo,String Field,String Value) throws Exception { Table user = getConnection().getTable(TableName.valueOf(tableName)); //使用Put類 byte[] rewkey_10s = Bytes.toBytes(rowkey); Put put = new Put(rewkey_10s); byte[] family = Bytes.toBytes(baseInfo); byte[] nameField = Bytes.toBytes(Field); byte[] nameValue = Bytes.toBytes(Value); put.addColumn(family,nameField,nameValue); user.put(put); user.close(); } //查詢資料,全表掃描獲取資料,這裡可以設定過濾器 public static ArrayList<ArrayList<Map<String, String>>> scan (String tName, String startKey, String endKey, Filter filter) throws Exception { Table table = getConnection().getTable(TableName.valueOf(tName)); Scan scan = new Scan(); if (startKey != null && endKey != null) { scan.setStartRow(Bytes.toBytes(startKey)); scan.setStopRow(Bytes.toBytes(endKey)); } if (filter != null) { scan.setFilter(filter); } Result result = null; ResultScanner scanner = table.getScanner(scan); ArrayList<ArrayList<Map<String, String>>> arrayLists = new ArrayList<ArrayList<Map<String, String>>>(); while ((result = scanner.next()) != null) { ArrayList<Map<String, String>> value = getValue(result); arrayLists.add(value); } return arrayLists; } //get方式獲取資料 public static ArrayList<Map<String, String>> get(String tName, String rowkey, String cf, String field) throws Exception { // user,rowkey,cf:username:value Table table = getConnection().getTable(TableName.valueOf(tName)); Get get = new Get(Bytes.toBytes(rowkey)); if (cf != null) { get.addFamily(Bytes.toBytes(cf)); } if (field != null) { get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(field)); } Result result = table.get(get); return getValue( result); } //裝載獲取到的資料的方法 private static ArrayList<Map<String, String>> getValue( Result result) { ArrayList<Map<String, String>> maps = new ArrayList<Map<String, String>>(); List<Cell> cells = result.listCells(); for (Cell cell : cells) { HashMap<String, String> hashMap = new HashMap<String, String>(); hashMap.put("RowKey", Bytes.toString(CellUtil.cloneRow(cell))); hashMap.put("Family", Bytes.toString(CellUtil.cloneFamily(cell))); hashMap.put("Field", Bytes.toString(CellUtil.cloneQualifier(cell))); hashMap.put("Value", Bytes.toString(CellUtil.cloneValue(cell))); maps.add(hashMap); } return maps; } //單例模式獲取連線 private static Connection getConnection() throws Exception { if (connection==null){ Configuration entries = HBaseConfiguration.create(); //設定最大的連線數量 connection = ConnectionFactory.createConnection(entries,Executors.newFixedThreadPool(30)); } return connection; } }