1. 程式人生 > >jdbc操作 資料庫同步,全量,加入執行緒,批處理

jdbc操作 資料庫同步,全量,加入執行緒,批處理

動態資料庫的全量、增量同步,多執行緒增加效能,批處理。

程式碼可以直接執行:

同步的類:

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import
java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; public class SynchronizationController{ //同步源 static String url_source="jdbc:mysql://localhost:3306/zntz?user=root&password=123456&useUnicode=true&characterEncoding=UTF8"
; //目標庫 static String url_destination="jdbc:mysql://localhost:3306/xx01?user=root&password=123456&useUnicode=true&characterEncoding=UTF8"; static Connection conn_source = null; static Connection conn_destination = null; static String sql_read; static String sql_insert; static
final int batchSize = 15000; static final int max_thread_size=5; public static void init(){ } public static void writeData(){ } public static void main(String[] args) throws SQLException, InterruptedException { try { Class.forName("com.mysql.jdbc.Driver"); conn_source = DriverManager.getConnection(url_source); conn_destination= DriverManager.getConnection(url_destination); conn_destination.setAutoCommit(false); synchronizationTables(conn_source, conn_destination); addData(conn_source, conn_destination); } catch (ClassNotFoundException e) { e.printStackTrace(); }catch (SQLException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { while(true){ if(InsertThread.getThreadCounts()>0){ Thread.sleep(1000); }else{ break; } } conn_source.close(); conn_destination.close(); } } //本地獲取表名獲取表名 public static Set<String> getTableName(Connection con) { Set<String> set = new HashSet<String>(); try { DatabaseMetaData meta = con.getMetaData(); ResultSet rs = meta.getTables(null, null, null,new String[] { "TABLE" }); while (rs.next()) { set.add(rs.getString("TABLE_NAME")); // String s = rs.getString("TABLE_NAME"); // String type = rs.getString("TABLE_TYPE"); // System.out.println(s+"======"+type); // getTableDDL(rs.getString("TABLE_NAME"), con); } } catch (Exception e) { e.printStackTrace(); } return set; } //目標資料庫 public static Map<String,String> getTableNameToMap(Connection con) { Map<String,String> map=new HashMap<String,String>(); try { DatabaseMetaData meta = con.getMetaData(); ResultSet rs = meta.getTables(null, null, null,new String[] { "TABLE" }); while (rs.next()) { map.put(rs.getString("TABLE_NAME"),"1"); } } catch (Exception e) { e.printStackTrace(); } return map; } //建立表 public static void createTable(String sql_ddl) throws SQLException { Statement stmt = conn_destination.createStatement(); int result = stmt.executeUpdate(sql_ddl);// executeUpdate語句會返回一個受影響的行數,如果返回-1就沒有成功 if (result != -1) { System.out.println("表建立成功"); }else{ System.out.println("表建立失敗:"+sql_ddl); } } //建立sql public static String getTableField(String tableName,Connection con) throws SQLException{ String sql = "select * from "+tableName; Statement state = con.createStatement(); ResultSet rs = state.executeQuery(sql); ResultSetMetaData rsd = rs.getMetaData() ; StringBuffer sql_model=new StringBuffer("insert into "+ tableName +" ("); StringBuffer sql_param=new StringBuffer(" VALUES("); for(int i = 1; i <= rsd.getColumnCount(); i++) { sql_model.append(rsd.getColumnName(i)); sql_param.append("?"); if (i < rsd.getColumnCount()) { sql_model.append(","); sql_param.append(","); } } sql_model.append(") ");sql_param.append(") "); System.out.println(sql_model.toString()+sql_param.toString()); return sql_model.toString()+sql_param.toString(); } public static void getTableField2(String tableName,Connection conn) throws SQLException{ ResultSet rs = conn.getMetaData().getColumns(null, conn.getMetaData().getUserName(),tableName.toUpperCase(), "%"); while(rs.next()){ String colName = rs.getString("COLUMN_NAME"); String remarks = rs.getString("REMARKS"); String dbType = rs.getString("TYPE_NAME"); System.out.println(colName+","+remarks+","+dbType); } } //獲取表結構ddl public static String getTableDDL(String tableName,Connection conn) throws SQLException{ ResultSet rs = null; PreparedStatement ps = null; ps = conn.prepareStatement("show create table "+tableName); rs = ps.executeQuery(); StringBuffer ddl=new StringBuffer(); while (rs.next()) { ddl.append(rs.getString(rs.getMetaData().getColumnName(2))); } return ddl.toString(); } /** * 檢查本地庫所有表在B庫裡是否存在,是否一致 * A本地庫 B目標庫 */ public static void synchronizationTables(Connection conA,Connection conB) throws SQLException{ Set<String> a_set=getTableName(conA); Map<String,String> b_map=getTableNameToMap(conB); Iterator<String> it=a_set.iterator(); while(it.hasNext()){ String n=it.next(); if(b_map.get(n)==null){ System.out.println("表名:"+n+" 不在目標庫裡"); String create_table_ddl=getTableDDL(n, conA); createTable(create_table_ddl); } } } //清楚表資料 public static boolean clearTableData(String tableName,Connection con){ try { Statement stmt = con.createStatement(); String sql = "TRUNCATE TABLE "+tableName; stmt.executeUpdate(sql); System.out.println(tableName+":表資料已被清空"); } catch (SQLException e) { e.printStackTrace(); System.out.println("異常表:"+tableName+"----資料清空失敗"); return false; } return true; } public static void addData(Connection conA,Connection conB) throws SQLException, InterruptedException{ Statement stmt_source = conA.createStatement(); Set<String> tableNameSet=getTableName(conn_source); Iterator<String> it = tableNameSet.iterator(); //遍歷表 while (it.hasNext()) { long start = System.currentTimeMillis(); String str = it.next(); if(!clearTableData(str, conB)){ continue; } while(true){ if(InsertThread.getThreadCounts()>0){ Thread.sleep(3000); }else{ break; } } String sql_insert=getTableField(str, conA); //獲取總條數 分頁查詢 String sql_count="select count(*) from "+ str; ResultSet rs = stmt_source.executeQuery(sql_count); rs.next(); int totalCount=rs.getInt(1); if(totalCount>batchSize){ int max=totalCount%batchSize==0 ? totalCount/batchSize : totalCount/batchSize+1; for(int i=0;i<max;i++){ synchronized (InsertThread.class) { String sql_data="select * from "+str+" limit "+ i*batchSize + " , "+batchSize; int tCount = InsertThread.getThreadCounts(); while (tCount >= max_thread_size) { System.out.println("系統當前執行緒數為:" + tCount+ ",已達到最大執行緒數 "+max_thread_size+",請等待其他執行緒執行完畢並釋放系統資源"); InsertThread.class.wait(); tCount = InsertThread.getThreadCounts(); } // 重新啟動一個子執行緒 Thread td = new InsertThread(sql_data, sql_insert, conB, conA); td.start(); System.out.println("已建立新的子執行緒: " + td.getName()); } } }else{ String sql_data="select * from "+str; Thread td = new InsertThread(sql_data, sql_insert, conB, conA); td.start(); } long end = System.currentTimeMillis(); System.out.println(str+" 表資料匯入完成,耗時:"+(end-start)/1000+"秒,"+(end-start)/60000+"分鐘"); } } }

執行緒控制類:

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;

public class InsertThread extends Thread {

    private String sql_data;

    private String sql_insert;

    private Connection conB;

    private Connection conA;

    static  int batchSize = 2500;

    // 執行緒計數器  
    static private int threadCounts;  
    // 執行緒名稱池  
    static private String threadNames[];  

    static {  
        // 假設這裡允許系統同時執行最大執行緒數為10個  
        int maxThreadCounts = 5;  
        threadNames = new String[maxThreadCounts];  
        // 初始化執行緒名稱池  
        for (int i = 1; i <= maxThreadCounts; i++) {  
            threadNames[i - 1] = "子執行緒_" + i;  
        }  
    }  

    public InsertThread() {  
        // 臨界資源鎖定  
        synchronized (InsertThread.class) {  
            // 執行緒總數加1  
            threadCounts++;  
            // 從執行緒名稱池中取出一個未使用的執行緒名  
            for (int i = 0; i < threadNames.length; i++) {  
                if (threadNames[i] != null) {  
                    String temp = threadNames[i];  
                    // 名被佔用後清空  
                    threadNames[i] = null;  
                    // 初始化執行緒名稱  
                    this.setName(temp);  
                    break;  
                }  
            }  
        }  
    }  

    public void run() {  
        try {
            Long start = System.currentTimeMillis();
            Statement stmt_source = conA.createStatement();
            ResultSet rs_sql_data = stmt_source.executeQuery(sql_data);
            ResultSetMetaData rsmd = rs_sql_data.getMetaData();
            PreparedStatement ps = conB.prepareStatement(sql_insert);
            int columnCount=rsmd.getColumnCount();
            int count=1;
            while (rs_sql_data.next()) {
                count++;
                for(int k=1;k<=columnCount;k++){
                  ps.setString(k, rs_sql_data.getString(k));
                }                
                ps.addBatch();
                if(count % batchSize == 0) {
                    ps.executeBatch();
                    conB.commit();
                }
            }
            ps.executeBatch();
            conB.commit();
            Long end = System.currentTimeMillis();
            System.out.println(this.getName()+",耗時:"+(end-start)/1000 + "秒");
            stmt_source.close();
            rs_sql_data.close();
            ps.close();
        } catch (Exception e) {  
            System.out.println(e);  
        } finally {  
            synchronized (InsertThread.class) {  

                // 釋放執行緒名稱  
                String[] threadName = this.getName().split("_");  
                // 執行緒名使用完後放入名稱池  
                threadNames[Integer.parseInt(threadName[1]) - 1] = this.getName();  

                // 執行緒執行完畢後減1  
                threadCounts--;  
                /* 
                 * 通知其他被阻塞的執行緒,但如果其他執行緒要執行,則該同步塊一定要執行結束(即直 
                 * 到釋放佔的鎖),其他執行緒才有機會執行,所以這裡的只是喚醒在此物件監視器上等待 
                 * 的所有執行緒,讓他們從等待池中進入物件鎖池佇列中,而這些執行緒重新執行時它們一定 
                 * 要先要得該鎖後才可能執行,這裡的notifyAll是不會釋放鎖的,試著把下面的睡眠語 
                 * 句註釋去掉,即使你已呼叫了notify方法,發現CreateThread中的同步塊還是好 
                 * 像一直處於物件等待狀態,其實呼叫notify方法後,CreateThread執行緒進入了物件鎖 
                 * 池佇列中了,只要它一獲取到鎖,CreateThread所線上程就會真真的被喚醒並執行。 
                 */  
                InsertThread.class.notifyAll();  

                System.out.println("----" + this.getName() + " 所佔用資源釋放完畢,當前系統正在執行的子執行緒數:"  
                        + threadCounts);  
            }  
        }  
    }  


    static public int getThreadCounts() {  
        synchronized (InsertThread.class) {  
            return threadCounts;  
        }  
    }

    public InsertThread(String sql_data, String sql_insert, Connection conB, Connection conA) {
        super();
        this.sql_data = sql_data;
        this.sql_insert = sql_insert;
        this.conB = conB;
        this.conA = conA;
        // 臨界資源鎖定  
        synchronized (InsertThread.class) {  
            // 執行緒總數加1  
            threadCounts++;  
            // 從執行緒名稱池中取出一個未使用的執行緒名  
            for (int i = 0; i < threadNames.length; i++) {  
                if (threadNames[i] != null) {  
                    String temp = threadNames[i];  
                    // 名被佔用後清空  
                    threadNames[i] = null;  
                    // 初始化執行緒名稱  
                    this.setName(temp);  
                    break;  
                }  
            }  
        }  
    }





}