Mysql 資料實時同步hbase
阿新 • • 發佈:2018-12-18
一、前言
要實時同步資料,首先要能實時的監控到資料庫資料的變化,可以使用canal、Maxwell 等工具完成。我選用canal,因為它更靈活,更合格我的專案需求。
二、通過canal監控資料庫資料變化
三、專案整體架構
四、主要程式碼
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.util.Bytes; import qjm.data.synch.hbase.HbaseSerialization; import qjm.data.synch.hbase.HbaseUtils; import qjm.data.synch.modle.Employee; import qjm.data.synch.service.SqlDataService; import java.io.IOException; import java.net.InetSocketAddress; import java.util.List; /** * 實時同步資料 */ public class OnlineSynch { static final Log LOG = LogFactory.getLog(OnlineSynch.class); SqlDataService sqlDataService = new SqlDataService("SqlMapConfig.xml"); HbaseUtils hbaseUtils = new HbaseUtils(); /** * 從關係型資料庫同步資料到hbase */ public void synchToHbase(){ // 建立連結 CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("192.168.135.132", 11111), "example", "", "" ); int batchSize = 1000; Long batchId = null; try { connector.connect(); //指定監聽資料庫 connector.subscribe("grg_hr\\..*"); connector.rollback(); while (true) { // 獲取指定數量的資料 Message message = connector.getWithoutAck(batchSize); batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { LOG.info("waitting..."); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { LOG.info(String.format("\nmessage[batchId=%s,size=%s]", batchId, size)); handleEntry(message.getEntries()); } connector.ack(batchId); // 提交確認 } } catch (Exception e) { // 處理失敗, 回滾資料 if (batchId != null) connector.rollback(batchId); LOG.error("Error: " + e.getMessage()); throw new RuntimeException(e); } finally { connector.disconnect(); } } /** * 處理 * @param entries */ private void handleEntry(List<Entry> entries) { //迴圈事件 for (Entry entry : entries) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChange = null; try { rowChange = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } //輸出事件資訊 CanalEntry.EventType eventType = rowChange.getEventType(); Header header = entry.getHeader(); LOG.info(String.format("\n================> binlog[%s:%s] , name[%s,%s] , eventType : %s", header.getLogfileName(), header.getLogfileOffset(), header.getSchemaName(), header.getTableName(), eventType)); //解析事件 for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { if (eventType == EventType.DELETE) { LOG.info("\n-------> delete"); deleteData(header.getTableName(), rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { LOG.info("\n-------> insert"); updateData(header.getTableName(), rowData.getAfterColumnsList()); } else if (eventType == EventType.UPDATE) { //LOG.info("\n-------> before"); //printColumn(rowData.getBeforeColumnsList()); LOG.info("\n-------> after"); updateData(header.getTableName(),rowData.getAfterColumnsList()); } } } } /** * 更新資料 */ private void updateData(String tableName, List<Column> columns){ /** * 1. 獲取主鍵 * 2. 根據主鍵查詢 * 3. 更新到hbase */ //獲取主鍵 Long key = getKey(columns); HbaseSerialization serialization = null; //根據不同表做處理 if(tableName.equals("hr_employee")){ serialization = sqlDataService.getEmployeeById(key); } if (serialization != null){ try { Employee employee = hbaseUtils.getData(new Get(Bytes.toBytes(key)), Employee.class); LOG.info("before : \n"+employee); hbaseUtils.putData(serialization); employee = hbaseUtils.getData(new Get(Bytes.toBytes(key)), Employee.class); LOG.info("before : \n"+employee); } catch (Exception e) { LOG.error(e.getMessage()); } } } /** * 刪除資料 */ private void deleteData(String tableName, List<Column> columns){ /** * 1. 獲取主鍵 * 2. 根據主鍵刪除hbase資料 */ //獲取主鍵 Long key = getKey(columns); Class clazz = null; //根據不同表做處理 if(tableName.equals("hr_employee")){ clazz = Employee.class; } try { Employee employee = hbaseUtils.getData(new Get(Bytes.toBytes(key)), Employee.class); LOG.info("before : \n"+employee); hbaseUtils.deleteData(clazz, new Delete(Bytes.toBytes(key))); employee = hbaseUtils.getData(new Get(Bytes.toBytes(key)), Employee.class); LOG.info("after : \n"+employee); } catch (Exception e) { LOG.error(e.getMessage()); } } /** * 獲取主鍵 * @return */ private Long getKey(List<Column> columns){ try{ for (Column column : columns) { if(column.getName().equals("id")){ return Long.valueOf(column.getValue()); } } }catch (Exception e){ e.printStackTrace(); throw new RuntimeException("Not found primary key !"); } throw new RuntimeException("Not found primary key !"); } }