1. 程式人生 > >hbase資料匯入hdfs中之(使用MapReduce程式設計統計hbase庫中的mingxing表中男女數量)

hbase資料匯入hdfs中之(使用MapReduce程式設計統計hbase庫中的mingxing表中男女數量)

資料
zhangfenglun,M,20,13522334455,[email protected],23521472
chenfei,M,20,13684634455,[email protected],84545472
liyuchen,M,20,13522334255,[email protected],84765472
liuwei,M,20,13528734455,[email protected],84521765
liuyang,M,20,13522354455,[email protected],84231472
caofei,M,20,13735675455,[email protected]

,84527642
zhaoxinkuan,M,20,13522334466,[email protected],84512472
gaoying,M,20,13454523455,[email protected],845212344
miaorongrong,F,18,13526234455,[email protected],84563457
huhaiyan,F,18,13522395455,[email protected],845217234
huangbo,F,18,18452346455,[email protected],2348466
lizhilong,M,20,13522134455,[email protected]
,845212312
zhouyongqiang,M,20,13522324455,[email protected],42211472
lianxiaodong,M,20,13522388355,[email protected],333321472
yangkailei,M,20,13523364455,[email protected],894685672
tiaoyiyang,M,20,13522336683,[email protected],84525434
songweifeng,M,20,13522383545,[email protected],815521472

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class hbaseToHdfs {
		// 表名
		private static final String TABLE_NAME = "mingxing";
		// 列名
		private static final String COLUMN_SEX = "sex";
		//zookeeper地址
		private static final String ZK_CONNECT = "hadoop01:2181,hadoop02:2181,hadoop03:2181";

	
		static class CountMingxingSexMRMapper extends TableMapper<Text, IntWritable> {
		
		/**
		 * key:rowkey value:map方法每執行一次接收到的一個引數,這個引數就是一個Result例項
		 * 這個Result裡面存的東西就是多個包含rowkey, family, qualifier, value, timestamp的cell
		 */
			@Override
			protected void map(ImmutableBytesWritable key, 
					Result value, 
					Context context) throws IOException, InterruptedException {
			Text mk=new Text();
			IntWritable mv=new IntWritable();
			//先拿到每個結果集   每個單元格的資料  這裡對應的就是一行資料
			List<Cell> cells = value.listCells();
			//迴圈遍歷每個單元格
			for(Cell c:cells){
				String sex = new String(CellUtil.cloneQualifier(c));
				//取列名並判斷是否是“sex”
				if(sex.equals(COLUMN_SEX)){
					mk.set(sex);
					mv.set(1);
					context.write(mk, mv);
					}
				}
			}
		}
		/**
		 * 輸入輸出的key-value型別
		 */
		static class CountMingxingSexMRReducer extends Reducer<Text, IntWritable, Text, LongWritable> {
			@Override
			protected void reduce(Text key, 
					Iterable<IntWritable> values, Context context)
					throws IOException, InterruptedException {
				long count = 0;
				for(IntWritable lw : values){
						count += lw.get();		//這兩種方法都可以
//					count++;					
				}
				context.write(key, new LongWritable(count));
			}
		}

	public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
		
		Configuration conf = HBaseConfiguration.create();		//載入配置檔案
		conf.set("hbase.zookeeper.quorum", ZK_CONNECT);		 	//	zookeeper入口
==標記文字==
		System.setProperty("HADOOP_USER_NAME", "hadoop");		//本地許可權設定

		Job job = Job.getInstance(conf);
		job.setJarByClass(Kaoshi.hbaseToHdfs.class);
		/**
		 * 引數1:表名
		 * 引數2:scan物件  可以是全表掃描的  也可以加過濾
		 * 引數3:mapper對應的類
		 * 引數4:map的輸出的key的型別
		 * 引數5:map輸出的value的型別
		 * 引數6:job
		 * 引數7:是否需要新增依賴的jar包 這裡設定為false  避免jar包衝突
		 */

		Scan scan = new Scan();
		/**
		 * TableMapReduceUtil:以util結尾:工具
		 * MapReduceFactory:以factory結尾,它是工廠類,最大作用就是管理物件的生成
		 */
		TableMapReduceUtil.initTableMapperJob(TABLE_NAME,
				scan, 
				CountMingxingSexMRMapper.class,
				Text.class, 
				IntWritable.class,
				job);
		job.setReducerClass(CountMingxingSexMRReducer.class);	//reduce類

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

//		Path outputPath = new Path("/hbase_mingxing/output");
		Path outputPath = new Path("hdfs://bd1805/mingxing_out01");
//		Path outputPath = new Path("D:\\bigdata\\mingxing\\output");	//本地路徑
		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(outputPath)) {
			fs.delete(outputPath, true);
		}
		FileOutputFormat.setOutputPath(job, outputPath);

		boolean waitForCompletion = job.waitForCompletion(true);
		System.exit(waitForCompletion ? 0 : 1);		//程式正常退出,或非正常退出
	}

	

}