Spark 讀取Hbase表資料並實現類似groupByKey操作
一、概述
程式執行環境很重要,本次測試基於:
hadoop-2.6.5
spark-1.6.2
hbase-1.2.4
zookeeper-3.4.6
jdk-1.8
廢話不多說了,直接上需求
Andy column=baseINFO:age, value=21
Andy column=baseINFO:gender, value=0
Andy column=baseINFO:telphone_number, value=110110110
Tom column=baseINFO:age, value=18
Tom column=baseINFO:gender, value =1
Tom column=baseINFO:telphone_number, value=120120120
如上表所示,將之用spark進行分組,達到這樣的效果:
[Andy,(21,0,110110110)]
[Tom,(18,1,120120120)]
需求比較簡單,主要是熟悉一下程式執行過程
二、具體程式碼
package com.union.bigdata.spark.hbase;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableSplit ;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java .function.Function2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple10;
import scala.Tuple2;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class ReadHbase {
private static String appName = "ReadTable";
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
//we can also run it at local:"local[3]" the number 3 means 3 threads
sparkConf.setMaster("spark://master:7077").setAppName(appName);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "master");
conf.set("hbase.zookeeper.property.clientPort", "2181");
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("baseINFO"));
scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("telphone_number"));
scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("age"));
scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("gender"));
String scanToString = "";
try {
ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
scanToString = Base64.encodeBytes(proto.toByteArray());
} catch (IOException io) {
System.out.println(io);
}
for (int i = 0; i < 2; i++) {
try {
String tableName = "VIPUSER";
conf.set(TableInputFormat.INPUT_TABLE, tableName);
conf.set(TableInputFormat.SCAN, scanToString);
//get the Result of query from the Table of Hbase
JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = jsc.newAPIHadoopRDD(conf,
TableInputFormat.class, ImmutableBytesWritable.class,
Result.class);
//group by row key like : [(Andy,110,21,0),(Tom,120,18,1)]
JavaPairRDD<String, List<Integer>> art_scores = hBaseRDD.mapToPair(
new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, List<Integer>>() {
@Override
public Tuple2<String, List<Integer>> call(Tuple2<ImmutableBytesWritable, Result> results) {
List<Integer> list = new ArrayList<Integer>();
byte[] telphone_number = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("telphone_number"));
byte[] age = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("age"));
byte[] gender = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("gender"));
//the type of storage at Hbase is Byte Array,so we must let it be normal like Int,String and so on
list.add(Integer.parseInt(Bytes.toString(telphone_number)));
list.add(Integer.parseInt(Bytes.toString(age)));
list.add(Integer.parseInt(Bytes.toString(gender)));
return new Tuple2<String, List<Integer>>(Bytes.toString(results._1().get()), list);
}
}
);
//switch to Cartesian product
JavaPairRDD<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>> cart = art_scores.cartesian(art_scores);
//use Row Key to delete the repetition from the last step "Cartesian product"
JavaPairRDD<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>> cart2 = cart.filter(
new Function<Tuple2<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>>, Boolean>() {
public Boolean call(Tuple2<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>> tuple2Tuple2Tuple2) throws Exception {
return tuple2Tuple2Tuple2._1()._1().compareTo(tuple2Tuple2Tuple2._2()._1()) < 0;
}
}
);
System.out.println("Create the List 'collect'...");
//get the result we need
List<Tuple2<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>>> collect = cart2.collect();
System.out.println("Done..");
System.out.println(collect.size() > i ? collect.get(i):"STOP");
if (collect.size() > i ) break;
} catch (Exception e) {
System.out.println(e);
}
}
}
}
三、程式執行過程分析
1、spark自檢以及Driver和excutor的啟動過程
例項化一個SparkContext(若在spark2.x下,這裡初始化的是一個SparkSession物件),這時候啟動SecurityManager執行緒去檢查使用者許可權,OK之後建立sparkDriver執行緒,spark底層遠端通訊模組(akka框架實現)啟動並監聽sparkDriver,之後由sparkEnv物件來註冊BlockManagerMaster執行緒,由它的實現類物件去監測執行資源
2、zookeeper與Hbase的自檢和啟動
第一步順利完成之後由sparkContext物件去例項去啟動程式訪問Hbase的入口,觸發之後zookeeper完成自己的一系列自檢活動,包括使用者許可權、作業系統、資料目錄等,一切OK之後初始化客戶端連線物件,之後由Hbase的ClientCnxn物件來建立與master的完整連線
3、spark job 的執行
程式開始呼叫spark的action類方法,比如這裡呼叫了collect,會觸發job的執行,這個流程網上資料很詳細,無非就是DAGScheduler搞的一大堆事情,連帶著出現一大堆執行緒,比如TaskSetManager、TaskScheduler等等,最後完成job,返回結果集
4、結束程式
正確返回結果集之後,sparkContext利用反射呼叫stop()方法,這之後也會觸發一系列的stop操作,主要執行緒有這些:BlockManager,ShutdownHookManager,後面還有釋放actor的操作等等,最後一切結束,臨時資料和目錄會被刪除,資源會被釋放