1. 程式人生 > >將Hdfs資料往Hbase表中匯入

將Hdfs資料往Hbase表中匯入

package Hbase;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
/**
* HBASE結合MapReduce批量匯入
* @author 劉超
*/
public class FPL {

public static void createTable() throws MasterNotRunningException, ZooKeeperConnectionException, IOException{  
    //配置 必須書寫  
    Configuration conf = HBaseConfiguration.create();  
    String tableName = "FPL";      //表名  
    String family_name = "conf";         //列族  
    conf.set("hbase.rootdir", "hdfs://localhost:9000/hbase");  
    conf.set("hbase.zookeeper.property.dataDir","/home/liuchao/Hbase/zookeeper_data");
    conf.set("hbase.zookeeper.quorum","localhost");  
    final HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);  
    if(!hbaseAdmin.tableExists(tableName)){       //表若不存在,則建立表
        HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);  
        HColumnDescriptor family = new HColumnDescriptor(family_name);  
        tableDescriptor.addFamily(family);  
        hbaseAdmin.createTable(tableDescriptor);  
    }  
}  

static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text>{ 
    Text v2 = new Text();   
    protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {  
        final String[] splited = value.toString().split(" ");  
        try {  

// System.out.println( splited[1]);
v2.set(value);
context.write(key, v2);
// System.out.println(key+” “+value);
// System.out.println(splited[0]);
} catch (NumberFormatException e) {
final Counter counter = context.getCounter(“BatchImport”, “ErrorFormat”);
counter.increment(1L);
System.out.println(“出錯了”+splited[0]+” “+e.getMessage());
}
};
}

static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{  
    @SuppressWarnings("deprecation")
    protected void reduce(LongWritable key, java.lang.Iterable<Text> values,  Context context) throws java.io.IOException ,InterruptedException {  
        for (Text text : values) {  
            final String[] splited = text.toString().split(" ");  
            String rowKey=splited[1];
            System.out.println(splited[23]);

// System.out.println(“201309”+rowKey);
// System.out.println(splited[1]);
final Put put = new Put(Bytes.toBytes(rowKey)); //新增rowKey
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“the time of taking off”), Bytes.toBytes(splited[0].trim())); //新增列簇,列名,數值 預計起飛日期
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“飛機國籍登記號”), Bytes.toBytes(splited[1].trim())); //新增列簇,列名,數值
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“飛機型別”), Bytes.toBytes(splited[2].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“尾流等級”), Bytes.toBytes(splited[3].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“飛行計劃型別”), Bytes.toBytes(splited[4].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“飛行種類”), Bytes.toBytes(splited[5].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“特殊機載裝置”), Bytes.toBytes(splited[6].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“起飛機場”), Bytes.toBytes(splited[7].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“預計起飛時間”), Bytes.toBytes(splited[8].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“預計飛行時間”), Bytes.toBytes(splited[9].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“著陸機場”), Bytes.toBytes(splited[10].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“備降機場”), Bytes.toBytes(splited[11].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“真空速”), Bytes.toBytes(splited[12].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“預計巡航高度”), Bytes.toBytes(splited[13].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“飛行路徑”), Bytes.toBytes(splited[14].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“駕駛員姓名”), Bytes.toBytes(splited[15].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“飛行證書號”), Bytes.toBytes(splited[16].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“駕駛員住址”), Bytes.toBytes(splited[17].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“電話號碼”), Bytes.toBytes(splited[18].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“飛行基地”), Bytes.toBytes(splited[19].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“飛機載油量”), Bytes.toBytes(splited[20].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“登機人數”), Bytes.toBytes(splited[21].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“最多載人數”), Bytes.toBytes(splited[22].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“飛機顏色”), Bytes.toBytes(splited[23].trim()));
//省略其他欄位,呼叫put.add(….)即可
context.write(NullWritable.get(), put);
// System.out.println(NullWritable.get()+” “+put);
}
};
}

public static void main(String[] args) throws Exception { 
    hangxingbaowen.createTable(); 
    final Configuration configuration = new Configuration();  
    //設定zookeeper  
    configuration.set("hbase.zookeeper.property.dataDir", "/home/liuchao/Hbase/zookeeper_data");
    configuration.set("hbase.rootdir", "hdfs://localhost:9000/hbase");  
    configuration.set("hbase.zookeeper.quorum","localhost");
    //設定hbase表名稱  
    configuration.set(TableOutputFormat.OUTPUT_TABLE, "FPL");  
    //將該值改大,防止hbase超時退出  
    configuration.set("dfs.socket.timeout", "180000");  
    @SuppressWarnings("deprecation")
    final Job job = new Job(configuration,"FPL");  

    job.setMapperClass(BatchImportMapper.class);   //設定mapper工作任務
    job.setReducerClass(BatchImportReducer.class);  //設定reducer工作任務
    //設定map的輸出,不設定reduce的輸出型別  
    job.setMapOutputKeyClass(LongWritable.class);  
    job.setMapOutputValueClass(Text.class);  

    job.setInputFormatClass(TextInputFormat.class);  
    //不再設定輸出路徑,而是設定輸出格式型別  
    job.setOutputFormatClass(TableOutputFormat.class);  

    FileInputFormat.setInputPaths(job, "hdfs://Master:9000//usr/local/hadoop/飛行報文/FPL.txt");  
    job.waitForCompletion(true);  
}  

}