1. 程式人生 > >MapReduce程式設計實現txt檔案中的內容匯入HBase

MapReduce程式設計實現txt檔案中的內容匯入HBase

一、建立java專案。

寫入程式碼,如下: [java] view plain copy  print?
  1. package translate1;  
  2. import java.io.IOException;  
  3. import org.apache.hadoop.conf.*;  
  4. import org.apache.hadoop.fs.Path;  
  5. import  org.apache.hadoop.mapreduce.*;  
  6. import org.apache.hadoop.io.LongWritable;  
  7. import  org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.mapreduce.lib.input.*;  
  9. import org.apache.hadoop.hbase.HBaseConfiguration;  
  10. import org.apache.hadoop.hbase.io.*;  
  11. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;  
  12. import org.apache.hadoop.hbase.client.Put;  
  13. import org.apache.hadoop.hbase.util.Bytes;  
  14. public
    class translate1 {      
  15.        publicstatic Job createSubmittableJob(Configuration conf, String[] args)throws IOException {  
  16.                        String tableName = args[0];  
  17.                        Path inputDir = new Path(args[1]);  
  18.                        @SuppressWarnings("deprecation")  
  19.                        Job job = new
     Job (conf, "hac_chapter2_recipe3");  
  20.                        job.setJarByClass(HourlyImporter.class);  
  21.                        FileInputFormat.setInputPaths(job, inputDir);   
  22.                       job.setMapperClass(HourlyImporter.class);  
  23.                       TableMapReduceUtil.initTableReducerJob(tableName, null, job);   
  24.                       job.setNumReduceTasks(0);  
  25.                       TableMapReduceUtil.addDependencyJars(job);  
  26.                       return job;   
  27.                               }   
  28.        publicstaticvoid main(String[] args)throws Exception {  
  29.                       Configuration conf = HBaseConfiguration.create();  
  30.                       Job job = createSubmittableJob(conf, args);  
  31.                       System.exit (job.waitForCompletion(true) ? 0 : 1);  
  32.                               }  
  33.                      }  
  34.       class HourlyImporter extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {  
  35.          privatelong ts;  
  36.          staticbyte[] family = Bytes.toBytes("n");  
  37.          @Override
  38.          protectedvoid setup(Context context) {   
  39.          ts = System.currentTimeMillis();   
  40.           }  
  41.                      publicstatic String change(String str,int n,boolean j){  
  42.                                if(str==null||str.length()>=n) return str;  
  43.                                 String s="";  
  44.                                for(int i=str.length();i<n;i++)  
  45.                                       s+="0";  
  46.                               if(j) return s+str;  
  47.                               elsereturn str+s;    
  48.                               }  
  49.                             @SuppressWarnings("deprecation")  
  50.                     publicvoid map(LongWritable offset, Text value, Context context)throws IOException {  
  51.                            try {  
  52.                                      String line = value.toString();  
  53.                                      String stationID = line.substring(04);  
  54.                                      String month = line.substring(57);  
  55.                                      String day = line.substring(79);  
  56.                                      String rowkey = stationID + month + day;  
  57.                                      byte[] bRowKey = Bytes.toBytes(rowkey);  
  58.                                      ImmutableBytesWritable rowKey =  new ImmutableBytesWritable(bRowKey);  
  59.                                      Put p = new Put(bRowKey);  
  60.                                      for (int i = 1; i < 4 ; i++) {  
  61.                                            String columnI ="v" + change(String.valueOf(i),2,true);   
  62.                                            int beginIndex = i * 2 + 8;  
  63.                                            String valueI =line.substring(beginIndex, beginIndex + 2).trim();  
  64.                                            p.add(family, Bytes.toBytes(columnI),ts, Bytes.toBytes(valueI));  
  65.                                             }  
  66.                                     context.write(rowKey, p);  
  67.                                }catch (InterruptedException e) {  
  68.                                     e.printStackTrace();   
  69.                                }   
  70.                           }   
  71.               }   

二、上傳txt檔案到HDFS系統

HDFS中的檔案內容為:

三、在HBase終端上建立HBase表格

建立時只需指定要建立表格的表名和列族名

四、配置java專案執行的引數

配置內容如圖:

五、執行結果

如圖: