1. 程式人生 > >Mysql 資料實時同步hbase

Mysql 資料實時同步hbase

一、前言

要實時同步資料,首先要能實時的監控到資料庫資料的變化,可以使用canal、Maxwell 等工具完成。我選用canal,因為它更靈活,更合格我的專案需求。

二、通過canal監控資料庫資料變化

三、專案整體架構

四、主要程式碼

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.util.Bytes;
import qjm.data.synch.hbase.HbaseSerialization;
import qjm.data.synch.hbase.HbaseUtils;
import qjm.data.synch.modle.Employee;
import qjm.data.synch.service.SqlDataService;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;

/**
 * 實時同步資料
 */
public class OnlineSynch {
    static final Log LOG = LogFactory.getLog(OnlineSynch.class);

    SqlDataService sqlDataService = new SqlDataService("SqlMapConfig.xml");
    HbaseUtils hbaseUtils = new HbaseUtils();

    /**
     * 從關係型資料庫同步資料到hbase
     */
    public void synchToHbase(){
        // 建立連結
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("192.168.135.132",
                        11111),
                "example",
                "",
                ""
        );
        int batchSize = 1000;
        Long batchId  = null;
        try {
            connector.connect();
            //指定監聽資料庫
            connector.subscribe("grg_hr\\..*");
            connector.rollback();
            while (true) {
                // 獲取指定數量的資料
                Message message = connector.getWithoutAck(batchSize);
                batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    LOG.info("waitting...");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    LOG.info(String.format("\nmessage[batchId=%s,size=%s]", batchId, size));
                    handleEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交確認
            }
        } catch (Exception e) {
            // 處理失敗, 回滾資料
            if (batchId != null) connector.rollback(batchId);

            LOG.error("Error: " + e.getMessage());
            throw new RuntimeException(e);
        } finally {
            connector.disconnect();
        }
    }

    /**
     * 處理
     * @param entries
     */
    private void handleEntry(List<Entry> entries) {
        //迴圈事件
        for (Entry entry : entries) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChange = null;
            try {
                rowChange = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            //輸出事件資訊
            CanalEntry.EventType eventType = rowChange.getEventType();
            Header header = entry.getHeader();
            LOG.info(String.format("\n================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    header.getLogfileName(), header.getLogfileOffset(),
                    header.getSchemaName(), header.getTableName(),
                    eventType));

            //解析事件
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    LOG.info("\n-------&gt; delete");
                    deleteData(header.getTableName(), rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    LOG.info("\n-------&gt; insert");
                    updateData(header.getTableName(), rowData.getAfterColumnsList());
                } else if (eventType == EventType.UPDATE) {
                    //LOG.info("\n-------&gt; before");
                    //printColumn(rowData.getBeforeColumnsList());
                    LOG.info("\n-------&gt; after");
                    updateData(header.getTableName(),rowData.getAfterColumnsList());
                }
            }
        }
    }

    /**
     * 更新資料
     */
    private void updateData(String tableName, List<Column> columns){
        /**
         * 1. 獲取主鍵
         * 2. 根據主鍵查詢
         * 3. 更新到hbase
         */
        //獲取主鍵
        Long key = getKey(columns);

        HbaseSerialization serialization = null;
        //根據不同表做處理
        if(tableName.equals("hr_employee")){
            serialization = sqlDataService.getEmployeeById(key);
        }

        if (serialization != null){
            try {
                Employee employee = hbaseUtils.getData(new Get(Bytes.toBytes(key)), Employee.class);
                LOG.info("before : \n"+employee);

                hbaseUtils.putData(serialization);

                employee = hbaseUtils.getData(new Get(Bytes.toBytes(key)), Employee.class);
                LOG.info("before : \n"+employee);
            } catch (Exception e) {
                LOG.error(e.getMessage());
            }
        }
    }

    /**
     * 刪除資料
     */
    private void deleteData(String tableName, List<Column> columns){
        /**
         * 1. 獲取主鍵
         * 2. 根據主鍵刪除hbase資料
         */
        //獲取主鍵
        Long key = getKey(columns);

        Class clazz = null;
        //根據不同表做處理
        if(tableName.equals("hr_employee")){
            clazz = Employee.class;
        }

        try {
            Employee employee = hbaseUtils.getData(new Get(Bytes.toBytes(key)), Employee.class);
            LOG.info("before : \n"+employee);

            hbaseUtils.deleteData(clazz, new Delete(Bytes.toBytes(key)));

            employee = hbaseUtils.getData(new Get(Bytes.toBytes(key)), Employee.class);
            LOG.info("after : \n"+employee);
        } catch (Exception e) {
            LOG.error(e.getMessage());
        }

    }

    /**
     * 獲取主鍵
     * @return
     */
    private Long getKey(List<Column> columns){
        try{
            for (Column column : columns) {
                if(column.getName().equals("id")){
                    return Long.valueOf(column.getValue());
                }
            }
        }catch (Exception e){
            e.printStackTrace();
            throw  new RuntimeException("Not found primary key !");
        }
        throw  new RuntimeException("Not found primary key !");
    }

}

五、專案程式碼