1. 程式人生 > >使用MapReduce讀取HBase資料儲存到MySQL

使用MapReduce讀取HBase資料儲存到MySQL

Mapper讀取HBase資料

package MapReduce;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;

import java.io.IOException;

public class CallMapper extends TableMapper<phoneInfoDBWritable,phoneInfoDBWritable>{

    //將log的caller,callee,time,dur提取出來,相當於將每一行資料讀取出來放入到 phoneInfo 物件中。
    private phoneInfo pp = new phoneInfo();
    private phoneInfoDBWritable pDB = null;
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

        //獲取rowkey
        String rowkey = new String(key.get());
        //獲取一行資料
        Cell[] cells = value.rawCells();
        // 獲取的資料,通話時長,日期
        String caller = "";
        String callee = "";
        String time = "";
        String dur = "";
        String flag = "";
        String dateCallk = "";
        //迴圈取出
        for (Cell cell :cells){
            // 取出行名稱
            String lineName = new String(CellUtil.cloneQualifier(cell));

            // 判斷打電話的人
            if(lineName.equals("caller")){
                caller = new String(CellUtil.cloneValue(cell));
            }
            // 接電話的人
            if(lineName.equals("callee")){
                callee = new String(CellUtil.cloneValue(cell));
            }
            // 判斷日期
            if(lineName.equals("time")){
                time = new String(CellUtil.cloneValue(cell));
            }
            // 判斷時長
            if(lineName.equals("dur")){
                dur = new String(CellUtil.cloneValue(cell));
            }
            // 判斷日期
            if(lineName.equals("flag")){
                flag = new String(CellUtil.cloneValue(cell));
            }
            //01_手機號_yyyMMddhhmmss_1
            String[] split = rowkey.split("_");
            //擷取打電話的人的電話號碼
            String phoneNum = split[1];
            //拼接key
            dateCallk = phoneNum + "_" + split[2].substring(0, 6);
            //輸出到檔案

        }
        //測試輸出內容
        pp.setCaller(caller);
        pp.setCallee(callee);
        pp.setTime(time);
        pp.setDur(dur);
        pp.setFlag(flag);
        //System.err.println("rowkey: " + rowkey + "-" +caller+ "-" +callee+ "-" + time + "-" +dur+ "-" +flag);
        //String string = "rowkey: " + rowkey + "-" +caller+ "-" +callee+ "-" + time + "-" +dur+ "-" +flag;
        //將資料寫入到mysql中
        pDB = new phoneInfoDBWritable(pp);
        context.write(pDB,null);
    }
}

Driver配置分發任務

package MapReduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;

public class MRRunner {

    public static void main(String[] args) throws Exception {

        Configuration conf = HBaseConfiguration.create();

        //建立configuration
        conf.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        Job job = Job.getInstance(conf, "db store");

        //實現與資料庫的連線
        DBConfiguration.configureDB(job.getConfiguration(), "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/callphone", "root","root");
        //將從HBase表中獲取的資料封裝寫入到資料庫表的格式
        DBOutputFormat.setOutput(job, "phone", "caller", "callee", "time", "dur","flag");

        //設定Driver
        job.setJarByClass(MRRunner.class);
        //設定資料輸出學出到mysql的類格式
        job.setOutputFormatClass(DBOutputFormat.class);

        //掃描HBase表
        Scan scan = new Scan();
        scan.setCacheBlocks(false);
        scan.setCaching(500);

        //設定Mapper
        job.setMapperClass(CallMapper.class);
        TableMapReduceUtil.initTableMapperJob(
                "phone:log",
                scan,
                CallMapper.class,
                phoneInfoDBWritable.class,
                phoneInfoDBWritable.class,
                job);

        // 設定Reduce數量,沒有使用到Reducer
        job.setNumReduceTasks(0);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

phoneInfo封裝讀取到的HBase

package MapReduce;

/**
 * 構建phoneInfo類,將HBase表中的資料儲存到phoneInfo物件中
 * 實現封裝資料
 */
public class phoneInfo{

    private String caller;
    private String callee;
    private String time;
    private String dur;
    private String flag;

    public String getCaller() {
        return caller;
    }

    public void setCaller(String caller) {
        this.caller = caller;
    }

    public String getCallee() {
        return callee;
    }

    public void setCallee(String callee) {
        this.callee = callee;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }

    public String getDur() {
        return dur;
    }

    public void setDur(String dur) {
        this.dur = dur;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }
}

phoneInfoDBWritable實現DBWritable用於存放phoneInfo物件

package MapReduce;

import org.apache.hadoop.mapreduce.lib.db.DBWritable;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

/**
 * 編寫phoneInfoDBWritable類實現DBWritable,完成HBase的資料寫入到指定的MySQL的序列化
 */
public class phoneInfoDBWritable implements DBWritable {

    private phoneInfo phoneinfo;

    public phoneInfoDBWritable() { }

    public phoneInfoDBWritable(phoneInfo phoneinfo) {
        this.phoneinfo = phoneinfo;
    }
    public void write(PreparedStatement statement) throws SQLException {
        statement.setString(1, phoneinfo.getCaller());
        statement.setString(2, phoneinfo.getCallee());
        statement.setString(3, phoneinfo.getTime());
        statement.setString(4, phoneinfo.getDur());
        statement.setString(5, phoneinfo.getFlag());
    }

    public void readFields(ResultSet resultSet) throws SQLException {

    }
}