1. 程式人生 > >Spark學習-SparkSQL--06-spark讀取HBase資料報異常java.io.NotSerializableException

Spark學習-SparkSQL--06-spark讀取HBase資料報異常java.io.NotSerializableException

1.準備工作,安裝好HABSE之後,執行Hbase shell
create ‘表名稱’, ‘列名稱1’,’列名稱2’,’列名稱N’
create ‘表名稱’,’列族名稱’
在hbase中列是可以動態新增的,只需要有個列族就可以了

create 'test_lcc_person','lcc_liezu'

然後新增一些資料key相同的是一條資料,一共有6條資料
put ‘表名稱’, ‘rowkey(相當於關係資料的ID,必須唯一)’, ‘列族名稱:列名稱:’, ‘值’

put 'test_lcc_person','1','lcc_liezu:name:','樑川川1'
put 'test_lcc_person','1','lcc_liezu:sex:','男' put 'test_lcc_person','1','lcc_liezu:age:','12' put 'test_lcc_person','2','lcc_liezu:name:','樑川川2' put 'test_lcc_person','2','lcc_liezu:sex:','男' put 'test_lcc_person','2','lcc_liezu:age:','12' put 'test_lcc_person','3','lcc_liezu:name:','樑川川3' put 'test_lcc_person'
,'3','lcc_liezu:sex:','男' put 'test_lcc_person','3','lcc_liezu:age:','12' put 'test_lcc_person','4','lcc_liezu:name:','樑川川4' put 'test_lcc_person','4','lcc_liezu:sex:','男' put 'test_lcc_person','4','lcc_liezu:age:','12' put 'test_lcc_person','5','lcc_liezu:name:','樑川川5' put 'test_lcc_person','5','lcc_liezu:sex:'
,'男' put 'test_lcc_person','5','lcc_liezu:age:','12' put 'test_lcc_person','6','lcc_liezu:name:','樑川川6' put 'test_lcc_person','6','lcc_liezu:sex:','男' put 'test_lcc_person','6','lcc_liezu:age:','12'

2。編寫spark程式讀取資料

package com.lcc.spark.hbase.test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import scala.Tuple2;

public class SparkOnHbase {

    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub

        System.setProperty("hadoop.home.dir", "E:\\02-hadoop\\hadoop-2.7.3\\");
        System.setProperty("HADOOP_USER_NAME", "root"); 

        System.setProperty("HADOOP_USER_NAME", "root"); 

       // System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

        SparkConf conf = new SparkConf();
        conf.setAppName("LG_CALCULATE");
        conf.setMaster("local");

        JavaSparkContext context = new JavaSparkContext(conf);


        Configuration configuration = HBaseConfiguration.create();  
        configuration.set("hbase.zookeeper.property.clientPort", "2181");  
        configuration.set("hbase.zookeeper.quorum", "192.168.10.82");  
        //configuration.set("hbase.master", "192.168.10.82:60000");  

        Scan scan = new Scan();
        String tableName = "test_lcc_person";
        configuration.set(TableInputFormat.INPUT_TABLE, tableName);

        ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
        String ScanToString = Base64.encodeBytes(proto.toByteArray());

        configuration.set(TableInputFormat.SCAN, ScanToString);

        JavaPairRDD<ImmutableBytesWritable, Result> myRDD = context.newAPIHadoopRDD(configuration,TableInputFormat.class, ImmutableBytesWritable.class, Result.class);

        System.out.println(myRDD.count());



        myRDD.foreach(new VoidFunction<Tuple2<ImmutableBytesWritable,Result>>(){

            @Override
            public void call(Tuple2<ImmutableBytesWritable, Result> tuple)
                    throws Exception {
                Result result = tuple._2();
                String rowkey = Bytes.toString(result.getRow());
                String name = Bytes.toString(result.getValue(Bytes.toBytes("lcc_liezu"), Bytes.toBytes("name")));
                String sex = Bytes.toString(result.getValue(Bytes.toBytes("lcc_liezu"), Bytes.toBytes("sex")));
                String age = Bytes.toString(result.getValue(Bytes.toBytes("lcc_liezu"), Bytes.toBytes("age")));
                System.out.print(rowkey);
                System.out.print("\t");
                System.out.print(name);
                System.out.print("\t");
                System.out.print(sex);
                System.out.print("\t");
                System.out.print(age);
                System.out.println("\t");

            }

        });
    }
}

執行報錯如下

3201 [Executor task launch worker for task 4] ERROR org.apache.spark.executor.Executor  - Exception in task 0.0 in stage 4.0 (TID 4)
java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
Serialization stack:
    - object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 31)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.SerializationStream.writeKey(Serializer.scala:133)
    at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:238)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:152)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
3228 [task-result-getter-0] ERROR org.apache.spark.scheduler.TaskSetManager  - Task 0.0 in stage 4.0 (TID 4) had a not serializable result: org.apache.hadoop.hbase.io.ImmutableBytesWritable
Serialization stack:
    - object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 31); not retrying
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 4.0 (TID 4) had a not serializable result: org.apache.hadoop.hbase.io.ImmutableBytesWritable
Serialization stack:
    - object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 31)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)

一看就知道這個是object的序列化問題。在google直接所搜Spark serializable得到解決方法,在SparkConf上set序列化:

 System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

這一句話註釋去掉,就可以了。