將Hdfs資料往Hbase表中匯入
package Hbase;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
/**
* HBASE結合MapReduce批量匯入
* @author 劉超
*/
public class FPL {
public static void createTable() throws MasterNotRunningException, ZooKeeperConnectionException, IOException{ //配置 必須書寫 Configuration conf = HBaseConfiguration.create(); String tableName = "FPL"; //表名 String family_name = "conf"; //列族 conf.set("hbase.rootdir", "hdfs://localhost:9000/hbase"); conf.set("hbase.zookeeper.property.dataDir","/home/liuchao/Hbase/zookeeper_data"); conf.set("hbase.zookeeper.quorum","localhost"); final HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); if(!hbaseAdmin.tableExists(tableName)){ //表若不存在,則建立表 HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); HColumnDescriptor family = new HColumnDescriptor(family_name); tableDescriptor.addFamily(family); hbaseAdmin.createTable(tableDescriptor); } } static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text>{ Text v2 = new Text(); protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException { final String[] splited = value.toString().split(" "); try {
// System.out.println( splited[1]);
v2.set(value);
context.write(key, v2);
// System.out.println(key+” “+value);
// System.out.println(splited[0]);
} catch (NumberFormatException e) {
final Counter counter = context.getCounter(“BatchImport”, “ErrorFormat”);
counter.increment(1L);
System.out.println(“出錯了”+splited[0]+” “+e.getMessage());
}
};
}
static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{
@SuppressWarnings("deprecation")
protected void reduce(LongWritable key, java.lang.Iterable<Text> values, Context context) throws java.io.IOException ,InterruptedException {
for (Text text : values) {
final String[] splited = text.toString().split(" ");
String rowKey=splited[1];
System.out.println(splited[23]);
// System.out.println(“201309”+rowKey);
// System.out.println(splited[1]);
final Put put = new Put(Bytes.toBytes(rowKey)); //新增rowKey
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“the time of taking off”), Bytes.toBytes(splited[0].trim())); //新增列簇,列名,數值 預計起飛日期
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“飛機國籍登記號”), Bytes.toBytes(splited[1].trim())); //新增列簇,列名,數值
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“飛機型別”), Bytes.toBytes(splited[2].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“尾流等級”), Bytes.toBytes(splited[3].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“飛行計劃型別”), Bytes.toBytes(splited[4].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“飛行種類”), Bytes.toBytes(splited[5].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“特殊機載裝置”), Bytes.toBytes(splited[6].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“起飛機場”), Bytes.toBytes(splited[7].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“預計起飛時間”), Bytes.toBytes(splited[8].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“預計飛行時間”), Bytes.toBytes(splited[9].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“著陸機場”), Bytes.toBytes(splited[10].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“備降機場”), Bytes.toBytes(splited[11].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“真空速”), Bytes.toBytes(splited[12].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“預計巡航高度”), Bytes.toBytes(splited[13].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“飛行路徑”), Bytes.toBytes(splited[14].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“駕駛員姓名”), Bytes.toBytes(splited[15].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“飛行證書號”), Bytes.toBytes(splited[16].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“駕駛員住址”), Bytes.toBytes(splited[17].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“電話號碼”), Bytes.toBytes(splited[18].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“飛行基地”), Bytes.toBytes(splited[19].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“飛機載油量”), Bytes.toBytes(splited[20].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“登機人數”), Bytes.toBytes(splited[21].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“最多載人數”), Bytes.toBytes(splited[22].trim()));
put.add(Bytes.toBytes(“conf”), Bytes.toBytes(“飛機顏色”), Bytes.toBytes(splited[23].trim()));
//省略其他欄位,呼叫put.add(….)即可
context.write(NullWritable.get(), put);
// System.out.println(NullWritable.get()+” “+put);
}
};
}
public static void main(String[] args) throws Exception {
hangxingbaowen.createTable();
final Configuration configuration = new Configuration();
//設定zookeeper
configuration.set("hbase.zookeeper.property.dataDir", "/home/liuchao/Hbase/zookeeper_data");
configuration.set("hbase.rootdir", "hdfs://localhost:9000/hbase");
configuration.set("hbase.zookeeper.quorum","localhost");
//設定hbase表名稱
configuration.set(TableOutputFormat.OUTPUT_TABLE, "FPL");
//將該值改大,防止hbase超時退出
configuration.set("dfs.socket.timeout", "180000");
@SuppressWarnings("deprecation")
final Job job = new Job(configuration,"FPL");
job.setMapperClass(BatchImportMapper.class); //設定mapper工作任務
job.setReducerClass(BatchImportReducer.class); //設定reducer工作任務
//設定map的輸出,不設定reduce的輸出型別
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
//不再設定輸出路徑,而是設定輸出格式型別
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.setInputPaths(job, "hdfs://Master:9000//usr/local/hadoop/飛行報文/FPL.txt");
job.waitForCompletion(true);
}
}