1. 程式人生 > >從hdfs批量匯出資料到hbase表中

從hdfs批量匯出資料到hbase表中

將hdfs中的日誌資料匯入到hbase中。

打成jar包在伺服器使用

hadoop jar xxx.jar 包名.類名           

執行

需要將hbase類庫加到hadoop的classpath中,在hadoop-env.sh檔案中新增hbase類庫

export HADOOP_CLASSPATH=/usr/local/hbase/lib/* 即可

package hbase.test;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;


public class HbaseImport {
	
	//讀取hdfs中的資料來源,解析併產生rowkey
	static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
		SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, LongWritable, Text>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] split = line.split("\t");//拆分每一行日誌為陣列
			//將第一列的時間戳轉換格式
			String dateStr = format.format(new Date(Long.parseLong(split[0])));
			//將電話號碼和日期拼在一起作為匯入到hbase的rowkey
			String rowKey = split[1] + ":" + dateStr;
			//將rowkey和原來的每一行內容作為新產生行內容
			Text v2 = new Text();
			v2.set(rowKey + "\t" + line);
			//還將原來的key作為和新產生的行輸出到reduce,也就是在原來每一行日誌前面加上了自己生成的rowkey,其他沒變
			context.write(key, v2);
		}
	}
	
	//將map端傳過來的資料匯入hbase
	static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{
		public static final String COLUMN_FAMILY = "cf";
		public static final String COLUMN_NAME_RAW = "raw";
		public static final String COLUMN_NAME_REPORTTIME = "reportTime";
		public static final String COLUMN_NAME_MSISDN = "msisdn";
		public static final String COLUMN_NAME_APMAC = "apmac";
		public static final String COLUMN_NAME_ACMAC = "acmac";
		public static final String COLUMN_NAME_HOST = "host";
		public static final String COLUMN_NAME_SITETYPE = "siteType";
		public static final String COLUMN_NAME_UPPACKNUM = "upPackNum";
		public static final String COLUMN_NAME_DOWNPACKNUM = "downPackNum";
		public static final String COLUMN_NAME_UPPAYLOAD = "upPayLoad";
		public static final String COLUMN_NAME_DOWNPAYLOAD = "downPayLoad";
		public static final String COLUMN_NAME_HTTPSTATUS = "httpStatus";
		@Override
		protected void reduce(LongWritable k2, Iterable<Text> v2s,
				TableReducer<LongWritable, Text, NullWritable>.Context context)
				throws IOException, InterruptedException {
			for (Text v2 : v2s) {
				String[] split = v2.toString().split("\t");
				String rowKey = split[0];//此時陣列第一位元素是rowkey
				
				Put put = new Put(rowKey.getBytes());
				put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_RAW.getBytes(), v2.toString().getBytes());
				put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_REPORTTIME.getBytes(), split[1].getBytes());
				put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_MSISDN.getBytes(), split[2].getBytes());
				put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_APMAC.getBytes(), split[3].getBytes());
				put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_ACMAC.getBytes(), split[4].getBytes());
				put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_HOST.getBytes(), split[5].getBytes());
				put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_SITETYPE.getBytes(), split[6].getBytes());
				put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_UPPACKNUM.getBytes(), split[7].getBytes());
				put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_DOWNPACKNUM.getBytes(), split[8].getBytes());
				put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_UPPAYLOAD.getBytes(), split[9].getBytes());
				put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_DOWNPAYLOAD.getBytes(), split[10].getBytes());
				put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_HTTPSTATUS.getBytes(), split[11].getBytes());
				
				context.write(NullWritable.get(), put);
			}
		}
	}
	
	public static void main(String[] args) throws Exception {
		Configuration conf = HBaseConfiguration.create();
		conf.set("hbase.rootdir", "hdfs://hadoop4:9000/hbase");
		conf.set("hbase.zookeeper.quorum", "hadoop4");
		conf.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");//輸出到hbase的表名
		conf.set("dfs.socket.tomeout", "180000");
		
		Job job = new Job(conf,HbaseImport.class.getSimpleName());
		//當打成jar包時,必須有以下兩行程式碼
		TableMapReduceUtil.addDependencyJars(job);
		job.setJarByClass(HbaseImport.class);
		
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TableOutputFormat.class);
		
		job.setMapperClass(BatchImportMapper.class);
		job.setMapOutputKeyClass(LongWritable.class);
		job.setMapOutputValueClass(Text.class);
		
		job.setReducerClass(BatchImportReducer.class);
		
		FileInputFormat.setInputPaths(job, "hdfs://hadoop4:9000/data/wlan");
		
		job.waitForCompletion(true);
	}
}