1. 程式人生 > >Hbase 表的Rowkey設計避免資料熱點

Hbase 表的Rowkey設計避免資料熱點

一、案例分析

常見避免資料熱點問題的處理方式有:加鹽、雜湊、反轉等方法結合預分割槽使用。

由於目前原資料第一欄位為時間戳形式,第二欄位為電話號碼,直接儲存容易引起熱點問題,通過加隨機列、組合時間戳、欄位反轉的方式來設計Rowkey,來實現既能高效查詢又能避免熱點問題。(由於案例資料量小未進行預分割槽)

二、程式碼部分

  1 package beifeng.hadoop.hbase;
  2 import java.io.IOException;
  3 import java.text.SimpleDateFormat;
  4 import java.util.Date;
  5
import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.conf.Configured; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.hbase.HBaseConfiguration; 9 import org.apache.hadoop.hbase.HColumnDescriptor; 10 import org.apache.hadoop.hbase.HTableDescriptor; 11 import
org.apache.hadoop.hbase.MasterNotRunningException; 12 import org.apache.hadoop.hbase.TableName; 13 import org.apache.hadoop.hbase.ZooKeeperConnectionException; 14 import org.apache.hadoop.hbase.client.HBaseAdmin; 15 import org.apache.hadoop.hbase.client.Mutation; 16 import org.apache.hadoop.hbase.client.Put;
17 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; 18 import org.apache.hadoop.hbase.mapreduce.TableReducer; 19 import org.apache.hadoop.io.LongWritable; 20 import org.apache.hadoop.io.NullWritable; 21 import org.apache.hadoop.mapreduce.Job; 22 import org.apache.hadoop.mapreduce.Mapper; 23 import org.apache.hadoop.mapreduce.Reducer; 24 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 25 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 26 import org.apache.hadoop.util.Tool; 27 import org.apache.hadoop.util.ToolRunner; 28 import org.apache.hadoop.io.Text; 29 30 /** 31 * 遵循rowkey的設計原則 32 * 1.rowkey不能過長 33 * 2.唯一性,加隨機列 md5 34 * 3.注意避免產生資料熱點 35 * 4.滿足更多的查詢場景 36 * @author Administrator 37 * 38 */ 39 public class LoadData extends Configured implements Tool { 40 41 /** 42 * 綜合考慮 使用時間和手機 做組合key,能更好的滿足應用場景 43 * @author Administrator 44 * 45 */ 46 public static class LoadDataMapper extends Mapper<LongWritable, Text, LongWritable, Text> { 47 //專門處理時間戳 =》標準時間格式 48 SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHsss"); 49 private Text mapOutputValue = new Text(); 50 @Override 51 protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context) 52 throws IOException, InterruptedException { 53 String line = value.toString(); 54 String[] splited = line.split("\t"); 55 56 //將切分的第一個欄位轉成標準時間 57 String formatDate = sdf.format(new Date(Long.parseLong(splited[0].trim()))); 58 //將手機號碼反轉 59 String phoneNumber = splited[1].toString(); 60 String reversePhoneNumber = new StringBuffer(phoneNumber).reverse().toString(); 61 62 String rowKeyString = reversePhoneNumber +"|"+formatDate; 63 //反轉手機號+“|”+時間 +正行內容拼接 64 mapOutputValue.set(rowKeyString+"\t"+ line); 65 context.write(key, mapOutputValue); 66 } 67 } 68 69 public static class LoadDataReuducer extends TableReducer<LongWritable, Text, NullWritable>{ 70 71 //設定HBase的列簇 72 private static final String COLUMN_FAMAILY = "info"; 73 @Override 74 protected void reduce(LongWritable key, Iterable<Text> values, 75 Reducer<LongWritable, Text, NullWritable, Mutation>.Context context) 76 throws IOException, InterruptedException { 77 for (Text value:values) { 78 String[] splited = value.toString().split("\t"); 79 String rowKey = splited[0]; 80 // System.err.println(rowKey); 81 Put put = new Put(rowKey.getBytes()); 82 //put.addColumn(COLUMN_FAMAILY.getBytes(),"row".getBytes(),value.getBytes()); 83 put.add(COLUMN_FAMAILY.getBytes(), "reportTime".getBytes(), splited[1].getBytes()); 84 put.add(COLUMN_FAMAILY.getBytes(), "apmac".getBytes(), splited[3].getBytes()); 85 put.add(COLUMN_FAMAILY.getBytes(), "acmac".getBytes(), splited[4].getBytes()); 86 put.add(COLUMN_FAMAILY.getBytes(), "host".getBytes(), splited[5].getBytes()); 87 put.add(COLUMN_FAMAILY.getBytes(), "siteType".getBytes(), splited[6].getBytes()); 88 put.add(COLUMN_FAMAILY.getBytes(), "upPackNum".getBytes(), splited[7].getBytes()); 89 put.add(COLUMN_FAMAILY.getBytes(), "downPackNum".getBytes(), splited[8].getBytes()); 90 put.add(COLUMN_FAMAILY.getBytes(), "unPayLoad".getBytes(), splited[9].getBytes()); 91 put.add(COLUMN_FAMAILY.getBytes(), "downPayLoad".getBytes(), splited[10].getBytes()); 92 put.add(COLUMN_FAMAILY.getBytes(),"httpStatus".getBytes(),splited[11].getBytes()); 93 context.write(NullWritable.get(), put); 94 95 } 96 } 97 } 98 99 public static void createTable(String tableName) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { 100 Configuration conf = HBaseConfiguration.create(); 101 conf.set("hbase.zookeeper.quorum", "beifeng01"); 102 103 HBaseAdmin admin = new HBaseAdmin(conf); 104 105 TableName tName = TableName.valueOf(tableName); 106 107 HTableDescriptor htd = new HTableDescriptor(tName); 108 HColumnDescriptor hcd = new HColumnDescriptor("info"); 109 htd.addFamily(hcd); 110 111 if(admin.tableExists(tName)) { 112 System.out.println(tableName+"is exist,trying to recrate the table"); 113 admin.disableTable(tName); 114 admin.deleteTable(tName); 115 } 116 admin.createTable(htd); 117 System.out.println("create new table"+ " " + tableName); 118 119 } 120 121 public int run(String[] args) throws Exception { 122 123 Configuration conf = this.getConf(); 124 conf.set("hbase.zookeeper.quorum", "beifeng01"); 125 conf.set(TableOutputFormat.OUTPUT_TABLE, "phoneLog"); 126 127 createTable("phoneLog"); 128 129 Job job = Job.getInstance(conf, this.getClass().getSimpleName()); 130 job.setJarByClass(this.getClass()); 131 job.setNumReduceTasks(1); 132 133 // map class 134 job.setMapperClass(LoadDataMapper.class); 135 job.setMapOutputKeyClass(LongWritable.class); 136 job.setMapOutputValueClass(Text.class); 137 138 // reduce class 139 job.setReducerClass(LoadDataReuducer.class); 140 job.setOutputFormatClass(TableOutputFormat.class); 141 142 Path inPath = new Path(args[0]); 143 FileInputFormat.addInputPath(job, inPath); 144 145 boolean isSucced = job.waitForCompletion(true); 146 147 return isSucced ? 0 : 1; 148 } 149 150 public static void main(String[] args) throws Exception { 151 Configuration conf = HBaseConfiguration.create(); 152 153 //指定HDFS資料地址 154 args = new String[] {"hdfs://hbase/data/input/HTTP_20130313143750.data"}; 155 int status = ToolRunner.run( 156 conf, 157 new LoadData(), 158 args); 159 160 System.exit(status); 161 } 162 }

執行完程式scan後Rowkey效果同設計一致