SyncNavigator資料同步那些事兒(優化過程分享)
WWW.SyncNavigator.CN 資料同步的軟體
SyncNavigator是一款專門用於SqlServer、Mysql資料同步的軟體,由國內頂級開發團隊開發完成,經歷8年逐步完善,目前具備強大的資料同步功能,國內很多大型連鎖超市,企業,公司都在用SyncNavigator資料同步軟體進行著資料同步服務。
它可以為我們提供智慧化資料同步,對您重要的資料庫進行實時同步操作,也可以設定定時任務傳輸,即使您的來源資料庫和目標資料庫版本不一樣,表結構不一樣,甚至是欄位不一樣,SyncNavigator也可以輕鬆幫您實現高效傳輸同步。
如果來源資料庫和目標資料庫表結構,欄位一樣,那麼全部保持預設設定即可,如果是異構資料庫,只需要動動滑鼠,輕鬆繫結來源資料庫表名和欄位,一一匹配,就能完成異構資料庫實時同步。
SyncNavigator可將資料庫同步到不同版本的資料庫上,無論你的資料庫是SqlServer 2000、還是SqlServer2008,還是SqlServer2014等,或者Mysql ,SyncNavigator都能輕鬆在他們之間無縫同步。
SyncNavigator資料庫同步軟體支援斷點續傳同步功能,第一次安裝配置好基本引數之後,就完全不用管,系統會在後臺執行,開關機也不影響資料同步,系統會在下次聯網的時候繼續上次未完成的作業,在資料庫同步的過程中出現故障,也能繼續同步資料庫,並確保資料完整性。
SyncNavigator同步是採用增量資料完成的,所以同步效率很高,每次只同步新資料或者新修改的資料,實時同步基本響應速度是毫秒級的,能迅速將源資料庫產生的新資料,或者修改的資料同步到目標資料庫上,確保資料完整性。
SyncNavigator有完善的日誌、報告郵件傳送功能,能保留每一步同步的步驟,供使用者查閱,郵件提醒功能能實時提醒傳輸進度。
SyncNavigator資料庫傳輸工具可以設定每張表的傳輸順序,以免有些表對別的邊的依賴性,先傳輸導致錯誤。
SyncNavigator資料庫傳輸工具可以設定傳輸開始或者結束後執行指定指令碼或者程式碼,能實現複雜的互動功能,讓使用者更靈活的實現自己需求。
支援分散式資料同步,可以把多個數據庫資料傳輸過來進行整合,也可以同時執行多個傳輸程序。
功能特色
SyncNavigator資料庫同步軟體特點:
1.能夠快速,持續,穩定的同步所需資料庫資料。在來源資料庫資料增加,修改或者刪除後自動同步到目標資料庫。
2.完整支援 Microsoft SQL Server。完整支援 Microsoft SQL Server 2000 2005 2008 2012 2014資料庫型別。並能在不同資料庫版本之間相互穩定高效同步資料,而不會出現問題。
3.完美支援 Mysql 4.1 以上版本。支援 Mysql 4.1 5.0 5.1 5.4 5.5 6.X。並能在不同資料庫版本之間相互同步資料,也可以將SqlServer 資料庫和Mysql資料庫之間進行同步,支援同構,異構資料庫同步。
4.只需要建立一個執行計劃就能在指定時間自動進行資料庫同步。輕鬆管理同步時間以及頻率,可以按天,或者按周,實時同步等,輕鬆設定,一鍵無憂。
5.不編寫SQL語句,不使用資料庫管理工具。與同類產品相比所需資料庫經驗知識最少,傻瓜式的同步設定,大多數情況下只需要設定來源資料庫地址,帳號密碼,目標資料庫地址,帳號密碼,然後其他保持預設,點選開始同步,就能完美完成資料同步需求。
簡介
很久之前就想寫這篇文章了,主要是介紹一下我做資料同步的過程中遇到的一些有意思的內容,和提升效率的過程。
當前在資料處理的過程中,資料同步如同血液一般充滿全過程,如圖:
資料同步開源產品對比:
DataX,是淘寶的開源專案,可惜不支援Postgresql
Sqoop,Apache開源專案,同步過程中欄位需要嚴格一致,不方便擴充套件,不易於二次開發
整體設計思路:
使用生產者消費者模型,中間使用記憶體,資料不落地,直接插入目標資料
優化過程:
1、插入資料部分:
首先生產者通過Jdbc獲取源資料內容,放入固定大小的快取佇列,同時消費者不斷的從快取讀取資料,根據不同的資料型別分別讀取出來,並逐條插入目標資料庫。
速度每秒300條,每分鐘1.8W條。
這樣做表面上看起來非常美好,流水式的處理,來一條處理一下,可是發現插入的速度遠遠趕不上讀取的速度,所以為了提升寫入的速度,決定採用批量處理的方法,事例程式碼:
@Override public Boolean call() { long beginTime = System.currentTimeMillis(); this.isRunning.set(true); try { cyclicBarrier.await(); int lineNum = 0; int commitCount = 0; // 快取數量 List<RowData> tmpRowDataList = new ArrayList<RowData>();// 快取陣列 while (this.isGetDataRunning.get() || this.queue.size() > 0) { // 從佇列獲取一條資料 RowData rowData = this.queue.poll(1, TimeUnit.SECONDS); if (rowData == null) { logger.info("this.isGetDataRunning:" + this.isGetDataRunning + ";this.queue.size():" + this.queue.size()); Thread.sleep(10000); continue; } // 新增到快取陣列 tmpRowDataList.add(rowData); lineNum++; commitCount++; if (commitCount == SyncConstant.INSERT_SIZE) { this.insertContractAch(tmpRowDataList); // 批量寫入 tmpRowDataList.clear(); // 清空快取 commitCount = 0; } if (lineNum % SyncConstant.LOGGER_SIZE == 0) { logger.info(" commit line: " + lineNum + "; queue size: " + queue.size()); } } this.insertContractAch(tmpRowDataList); // 批量寫入 tmpRowDataList.clear();// 清空快取 logger.info(" commit line end: " + lineNum); } catch (Exception e) { logger.error(" submit data error" , e); } finally { this.isRunning.set(false); } logger.info(String.format("SubmitDataToDatabase used %s second times", (System.currentTimeMillis() - beginTime) / 1000.00)); return true; } /** * 批量插入資料 * * @param rowDatas * @return */ public int insertContractAch(List<RowData> rowDatas) { final List<RowData> tmpObjects = rowDatas; String sql = SqlService.createInsertPreparedSql(tableMetaData); // 獲取sql try { int[] index = this.jdbcTemplate.batchUpdate(sql, new PreparedStatementSetter(tmpObjects, this.columnMetaDataList)); return index.length; } catch (Exception e) { logger.error(" insertContractAch error: " , e); } return 0; } /** * 處理批量插入的回撥類 */ private class PreparedStatementSetter implements BatchPreparedStatementSetter { private List<RowData> rowDatas; private List<ColumnMetaData> columnMetaDataList; /** * 通過建構函式把要插入的資料傳遞進來處理 */ public PreparedStatementSetter(List<RowData> rowDatas, List<ColumnMetaData> columnList) { this.rowDatas = rowDatas; this.columnMetaDataList = columnList; } @Override public void setValues(PreparedStatement ps, int i) throws SQLException { RowData rowData = this.rowDatas.get(i); for (int j = 0; j < rowData.getColumnObjects().length; j++) { // 型別轉換 try { ColumnAdapterService.setParameterValue(ps, j + 1, rowData.getColumnObjects()[j], this.columnMetaDataList.get(j).getType()); } catch (Exception e) { ps.setObject(j + 1, null); } } } }
咱們不是需要講解程式碼,所以這裡截取了程式碼片段,全部的程式碼github上有,感興趣的同學可以看看。PreparedStatement的好處,可以參考文章:http://www.cnblogs.com/liqiu/p/3825544.html
由於增加批量插入的功能,終於速度提升到每秒1000條
2、多執行緒優化
每秒1000條,速度依然不理想,特別是寫的速度跟不上讀取的速度,佇列是滿的,如圖:
所以只能提升消費者的數量,採用了多消費者的模式:
速度提升到每秒3000條。
3、升級讀取方式
這時候觀察,隨著消費者的增加,觀察快取佇列經常有空的情況,也就是說生產跟不上消費者速度,如果增加生產者的執行緒,那麼也會增加程式的複雜性,因為勢必要將讀取的資料進行分割。所以採用Pgdump的方式直接獲取資料(並不是所有情況都適用,比如資料中有特殊的分隔符與設定的分隔符一樣,或者有分號,單引號之類的)
程式碼片段如下:
/** * 將資料放入快取佇列 */ public void putCopyData() { DataSourceMetaData dataSource = dataSourceService.getDataSource(syncOptions.getSrcDataSourceName()); String copyCommand = this.getCopyCommand(dataSource, querySql); //獲取copy命令 ShellExecuter.execute(copyCommand, queue,columnMetaDatas); } /** * 執行copy的shell命令 * @param dataSource * @param sql * @return */ public String getCopyCommand(DataSourceMetaData dataSource, String sql){ String host = dataSource.getIp(); String user = dataSource.getUserName(); String dataBaseName = dataSource.getDatabaseName(); //String psqlPath = "/Library/PostgreSQL/9.3/bin/psql"; String psqlPath = "/opt/pg93/bin/psql"; String execCopy = psqlPath + " -h " + host + " -U " + user + " " + dataBaseName +" -c \"COPY (" + sql + ") TO STDOUT WITH DELIMITER E'"+ HiveDivideConstant.COPY_COLUMN_DIVIDE+"' CSV NULL AS E'NULL'\" "; // 執行copy命令 LOGGER.info(execCopy); return execCopy; }
意思就是通過執行一個Shell程式,獲取資料,然後讀取程序的輸出流,不斷寫入快取。這樣生產者的問題基本都解決了,速度完全取決於消費者寫入資料庫的速度了。下面是執行Shell的Java方法程式碼:
public static int execute(String shellPath, LinkedBlockingQueue<RowData> queue, List<ColumnMetaData> columnMetaDatas) { int success = -1; Process pid = null; String[] cmd; try { cmd = new String[]{"/bin/sh", "-c", shellPath}; // 執行Shell命令 pid = Runtime.getRuntime().exec(cmd); if (pid != null) { BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pid.getInputStream()), SyncConstant.SHELL_STREAM_BUFFER_SIZE); try { String line; while ((line = bufferedReader.readLine()) != null) { // LOGGER.info(String.format("shell info output [%s]", line)); String[] columnObjects = line.split(HiveDivideConstant.COPY_COLUMN_DIVIDE.toString(), -1); if (columnObjects.length != columnMetaDatas.size()) { LOGGER.error(" 待同步的表有特殊字元,不能使用copy [{}] ", line); throw new RuntimeException("待同步的表有特殊字元,不能使用copy " + line); } RowData rowData = new RowData(line.split(HiveDivideConstant.COPY_COLUMN_DIVIDE.toString(), -1)); queue.put(rowData); } } catch (Exception ioe) { LOGGER.error(" execute shell error", ioe); } finally { try { if (bufferedReader != null) { bufferedReader.close(); } } catch (Exception e) { LOGGER.error("execute shell, get system.out error", e); } } success = pid.waitFor(); if (success != 0) { LOGGER.error("execute shell error "); } } else { LOGGER.error("there is not pid "); } } catch (Exception ioe) { LOGGER.error("execute shell error", ioe); } finally { if (null != pid) { try { //關閉錯誤輸出流 pid.getErrorStream().close(); } catch (IOException e) { LOGGER.error("close error stream of process fail. ", e); } finally { try { //關閉標準輸入流 pid.getInputStream().close(); } catch (IOException e) { LOGGER.error("close input stream of process fail.", e); } finally { try { pid.getOutputStream().close(); } catch (IOException e) { LOGGER.error(String.format("close output stream of process fail.", e)); } } } } } return success; }
4、記憶體優化
在上線一段時間之後,發現兩個問題:1、使用Jdbc方式獲取資料,如果這個資料表比較大,那麼獲取第一條資料的速度特別慢;2、這個程序還會佔用非常大的記憶體,並且GC不掉。分析原因,是Postgresql的Jdbc獲取資料的時候,會一次將所有資料放入到記憶體,如果同步的資料表非常大,那麼甚至會將記憶體撐爆。
那麼優化的方法是設定使Jdbc不是一次全部將資料拿到記憶體,而是批次獲取,程式碼如下:
con.setAutoCommit(false); //並不是所有資料庫都適用,比如hive就不支援,orcle不需要 stmt.setFetchSize(10000); //每次獲取1萬條記錄
整體設計方案:
現在這個專案已經開源,程式碼放在:https://github.com/autumn-star/synchronous