hbase資料匯入hdfs中之(使用MapReduce程式設計統計hbase庫中的mingxing表中男女數量)
資料
zhangfenglun,M,20,13522334455,[email protected],23521472
chenfei,M,20,13684634455,[email protected],84545472
liyuchen,M,20,13522334255,[email protected],84765472
liuwei,M,20,13528734455,[email protected],84521765
liuyang,M,20,13522354455,[email protected],84231472
caofei,M,20,13735675455,[email protected]
zhaoxinkuan,M,20,13522334466,[email protected],84512472
gaoying,M,20,13454523455,[email protected],845212344
miaorongrong,F,18,13526234455,[email protected],84563457
huhaiyan,F,18,13522395455,[email protected],845217234
huangbo,F,18,18452346455,[email protected],2348466
lizhilong,M,20,13522134455,[email protected]
zhouyongqiang,M,20,13522324455,[email protected],42211472
lianxiaodong,M,20,13522388355,[email protected],333321472
yangkailei,M,20,13523364455,[email protected],894685672
tiaoyiyang,M,20,13522336683,[email protected],84525434
songweifeng,M,20,13522383545,[email protected],815521472
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class hbaseToHdfs {
// 表名
private static final String TABLE_NAME = "mingxing";
// 列名
private static final String COLUMN_SEX = "sex";
//zookeeper地址
private static final String ZK_CONNECT = "hadoop01:2181,hadoop02:2181,hadoop03:2181";
static class CountMingxingSexMRMapper extends TableMapper<Text, IntWritable> {
/**
* key:rowkey value:map方法每執行一次接收到的一個引數,這個引數就是一個Result例項
* 這個Result裡面存的東西就是多個包含rowkey, family, qualifier, value, timestamp的cell
*/
@Override
protected void map(ImmutableBytesWritable key,
Result value,
Context context) throws IOException, InterruptedException {
Text mk=new Text();
IntWritable mv=new IntWritable();
//先拿到每個結果集 每個單元格的資料 這裡對應的就是一行資料
List<Cell> cells = value.listCells();
//迴圈遍歷每個單元格
for(Cell c:cells){
String sex = new String(CellUtil.cloneQualifier(c));
//取列名並判斷是否是“sex”
if(sex.equals(COLUMN_SEX)){
mk.set(sex);
mv.set(1);
context.write(mk, mv);
}
}
}
}
/**
* 輸入輸出的key-value型別
*/
static class CountMingxingSexMRReducer extends Reducer<Text, IntWritable, Text, LongWritable> {
@Override
protected void reduce(Text key,
Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
long count = 0;
for(IntWritable lw : values){
count += lw.get(); //這兩種方法都可以
// count++;
}
context.write(key, new LongWritable(count));
}
}
public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
Configuration conf = HBaseConfiguration.create(); //載入配置檔案
conf.set("hbase.zookeeper.quorum", ZK_CONNECT); // zookeeper入口
==標記文字==
System.setProperty("HADOOP_USER_NAME", "hadoop"); //本地許可權設定
Job job = Job.getInstance(conf);
job.setJarByClass(Kaoshi.hbaseToHdfs.class);
/**
* 引數1:表名
* 引數2:scan物件 可以是全表掃描的 也可以加過濾
* 引數3:mapper對應的類
* 引數4:map的輸出的key的型別
* 引數5:map輸出的value的型別
* 引數6:job
* 引數7:是否需要新增依賴的jar包 這裡設定為false 避免jar包衝突
*/
Scan scan = new Scan();
/**
* TableMapReduceUtil:以util結尾:工具
* MapReduceFactory:以factory結尾,它是工廠類,最大作用就是管理物件的生成
*/
TableMapReduceUtil.initTableMapperJob(TABLE_NAME,
scan,
CountMingxingSexMRMapper.class,
Text.class,
IntWritable.class,
job);
job.setReducerClass(CountMingxingSexMRReducer.class); //reduce類
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// Path outputPath = new Path("/hbase_mingxing/output");
Path outputPath = new Path("hdfs://bd1805/mingxing_out01");
// Path outputPath = new Path("D:\\bigdata\\mingxing\\output"); //本地路徑
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileOutputFormat.setOutputPath(job, outputPath);
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion ? 0 : 1); //程式正常退出,或非正常退出
}
}