1. 程式人生 > >mysql函式大批量插入資料+java多執行緒併發案例

mysql函式大批量插入資料+java多執行緒併發案例

當前需要做個測試,mysql資料庫需要5000萬資料來完成效能測試,於是我們寫了個函式來完成這個功能。

函式定義如下:

#批量插入資料庫的函式
BEGIN
    #定義需要插入資料庫表中的欄位;
 
    DECLARE id BIGINT(20);
    DECLARE phone_number varchar(11);
 
    #定義變數控制插入資料庫表的行數
    DECLARE sid bigint(20) DEFAULT 1;
    DECLARE count bigint(20);
    declare total bigint(20) DEFAULT 50000000;

    #開啟迴圈判斷
    while sid < total do
        SET count = sid + 100000; //每次插入資料庫條數,防止條數太多提交失敗;

    #開始事務
    start transaction;
    WHILE (sid < count) DO //迴圈插入資料庫資料
    SET id = sid;
    SET phone_number = CONCAT(1, ROUND(ROUND(RAND(),10)*10000000000));
    INSERT INTO t_contact_phone(sid,phone_number)
    VALUES(id,phone_number);
    #累計行數
    SET sid = sid+1;
    END WHILE;
    commit;  //提交事務
  end while; //結束迴圈
END

執行上述函式即可建立5000萬條資料在測試庫用於測試。

當我們需要同時執行大批量操作,那如果依然順序單執行緒執行,會比較耗時,為了提高效率,我們需要通過多執行緒非同步方式來提高效率;

比方說大批量資料的同步方案,只能通過批量查詢+多執行緒+非同步的方式來完成。才能更短的時間內更高效的完成任務。

public class SynchronizationData {
private static final Logger logger = LoggerFactory.getLogger(SynchronizationData.class);
@Autowired
private ApplicationSumaryMapper applicationSumaryMapper
;//需要同步的資料表(mybatis完成) @Autowired private ContactPhoneMapper contactPhoneMapper;//需要插入的資料表一(mybatis完成) @Autowired private MiddleFieldMapper middleFieldMapper; //需要插入的資料表二(mybatis完成) private static ExecutorService executor = Executors.newFixedThreadPool(2); //定義執行緒池用於多執行緒執行。 //需要同步總條數 private long totalNumber = 0; //需要同步的最大進件id; private long maxSid = 0; //當前已經同步的sid private long currentSid = 0; private long currentPage = 0; private long maxPage = 0; public void startSynData() { //獲取當前系統中最大的sid maxSid = applicationSumaryMapper.getMaxSid(); logger.info("需要同步的最大進件id(SID)為:", maxSid); //根據sid獲取總條數 totalNumber = applicationSumaryMapper.getTotalNumber(maxSid); logger.info("需要同步的進件總數為:", totalNumber); maxPage = totalNumber / 1000 + 1; logger.info("最大頁數為:", maxPage); //判斷能否開始執行,執行時間為晚上12點到早上8點 boolean synStart = isSynStart(currentSid, maxSid); if (synStart) { //開始分頁查詢 for (long i = 0; i <= maxPage;) { List<ContactPhone> contactList = new ArrayList<ContactPhone>(); List<MiddleField> middleList = new ArrayList<MiddleField>(); currentPage = i;
            //查詢需要同步的資料集
            List<ApplicationSumary> synDataList = applicationSumaryMapper.getSynData(currentPage, currentSid, maxSid);
            //遍歷集合組裝資料
prepareDataForBatchInsert(contactList, middleList, synDataList);
            i = batchAddData(i, contactList, middleList, synDataList);
        }
    } else {
        //不符合同步條件,啥也不做
}
}

@Transactional //通過事務控制同成功,同失敗。
private long batchAddData(long i, List<ContactPhone> contactList, List<MiddleField> middleList, List<ApplicationSumary> synDataList) {
    //非同步批量插入contactList
Future<Integer> contactNumber = executor.submit(new contactPhoneCallable(contactList));
    //非同步批量插入middleList
Future<Integer> middleNumber = executor.submit(new middleFiledCallable(middleList));
    try {
        Integer number1 = contactNumber.get();
        Integer number2 = middleNumber.get();
        //如果都成功則繼續
if (number1 != -1 && number2 != -1) {
         i++;
        } else {
            //失敗則繼續當前頁執行
}

    } catch (Exception e) {
        e.printStackTrace();
        //異常並記錄當前的sid
logger.info("異常最小的sid為:"+synDataList.get(0).getSid());
        logger.info("異常最大的sid為:"+synDataList.get(synDataList.size()-1).getSid());
    }
    return i;
}
//遍歷要同步的資料集合組裝要批量插入的資料集合
private void prepareDataForBatchInsert(List<ContactPhone> contactList, List<MiddleField> middleList, List<ApplicationSumary> synDataList) {
    for (ApplicationSumary application : synDataList
            ) {
        MiddleField middleField = new MiddleField(application.getSid(), application.getUnitAddressCode(), application.getCustunitAddress(), application.getRegistAddressCode(), application.getRegistAddress()
                , application.getLiveAddressCode(), application.getLiveAddress(), application.getCustQq(), application.getCustWechatcode(), application.getAppStatus(), application.getAppStatusCode());
        middleList.add(middleField);

        //客戶電話號
if (application.getPhoneNubmer() != null) {
            ContactPhone phoneForSelf = new ContactPhone();
            contactList.add(phoneForSelf);
        };
        //銷售人員電話
if (application.getSalesPhone() != null) {
            ContactPhone phoneForSales = new ContactPhone();
            contactList.add(phoneForSales);
        };
        //家庭聯絡人電話
if (application.getFamilyPhone() != null) {
            ContactPhone phoneForFamily = new ContactPhone();
            contactList.add(phoneForFamily);
        };
        //緊急聯絡人電話
if (application.getOtherContactphone() != null) {
            ContactPhone phoneForOther = new ContactPhone();
            contactList.add(phoneForOther);
        };
        //工作證明人電話
if (application.getFalyPhone() != null) {
            ContactPhone phoneForFaly = new ContactPhone();
            contactList.add(phoneForFaly);
        };

    }
}
//判斷是否可以開始同步,這裡只允許晚上0點到早上八點允許執行
private boolean isSynStart(long currentSid, long maxSid) {
    Date date = new Date();
    int hours = date.getHours();
    if ((hours >= 0 || hours <= 7) && currentSid < maxSid) {
        return true;
    } else {
        return false;
    }
}
//多執行緒執行任務一:給聯絡人表中批量插入資料
class contactPhoneCallable implements Callable<Integer>{
     private List<ContactPhone> runList;

    public contactPhoneCallable(List<ContactPhone> runList) {
        this.runList = runList;
    }
    
    @Override
public Integer call() {
        try {
            int addNumber = contactPhoneMapper.batchAdd(runList);
            return addNumber;
        } catch (Exception e) {
            return -1;
        }
    }
}
//多執行緒執行的任務二;給中間資訊表中批量插入資料
class middleFiledCallable implements Callable<Integer>{
    private List<MiddleField> runList;

    public middleFiledCallable(List<MiddleField> runList) {
        this.runList = runList;
    }

    @Override
public Integer call() {
        try {
            int middleNumber = middleFieldMapper.batchAdd(runList);
            return middleNumber;
        } catch (Exception e) {
            return -1;
        }
    }
}
}

最後給出分頁查詢和批量插入的sql語句:

mybatis定義的方法:
List<ApplicationSumary> getSynData(@Param("currentPage") long currentPage, @Param("currentSid") long currentSid, @Param("maxSid") long maxSid);

分頁查詢sql:

<select id="getSynData" resultMap="BaseResultMap">
  select
  <include refid="Base_Column_List" />
  from t_application_sumary
  where sid<![CDATA[ <= ]]>#{maxSid} and sid> #{currentSid} limit #{currentPage},#{currentPage+1000}
</select>
mybatis定義的方法:
int batchAdd(List<ContactPhone> contactList);

批量插入sql:
<insert id="batchAdd" parameterType="java.util.List">
  insert into t_contact_phone (sid,phone_type,phone_number,phone_name)
  values
  <foreach collection="list" item="item" index="index" separator=",">
    (#{item.sid,jdbcType=BIGINT}, #{item.phoneType,jdbcType=VARCHAR}, #{item.phoneNumber,jdbcType=char},
    #{item.phoneName,jdbcType=VARCHAR})
  </foreach>
</insert>


相關推薦

mysql函式批量插入資料+java執行併發案例

當前需要做個測試,mysql資料庫需要5000萬資料來完成效能測試,於是我們寫了個函式來完成這個功能。 函式定義如下: #批量插入資料庫的函式 BEGIN     #定義需要插入資料庫表中的欄位;       DECLARE id BIGINT(20);     DECLA

Java執行/併發25、Exchanger執行資料交換

Exchanger用於執行緒間的資料交換。它提供一個同步點,在這個同步點兩個執行緒可以交換彼此的資料。 這句話說到兩個關鍵點: Exchanger只能用於兩個執行緒互相交換資料。如果有三個執行緒呢?對不起,臣妾做不到…… Exchanger會產生一個同步點

Java執行/併發05、synchronized應用例項:執行間操作共享資料

電商平臺中最重要的一點就是賣東西。同個商品不能無限制的賣下去的,因為商品有庫存量,超過庫存就不能賣了。 這裡,約定一個規則,下單使庫存減n,取消訂單使庫存加m。庫存數量不可以小於0。 假設平臺上同時有很多使用者在操作,在不考慮效率的情況下,我們用同步方法來模

java執行併發庫高階應用 之 執行範圍內共享資料

轉自:http://blog.csdn.net/xushuaic/article/category/1335611 筆記摘要:  所謂執行緒範圍內共享資料,即對於相同的程式程式碼,多個模組在同一個執行緒中執行時要共享一份資料,而在另外執行緒中執行時又共

非結構化資料與結構化資料提取---執行爬蟲案例

多執行緒糗事百科案例 案例要求參考上一個糗事百科單程序案例 Queue(佇列物件) Queue是python中的標準庫,可以直接import Queue引用;佇列是執行緒間最常用的交換資料的形式 python下多執行緒的思考 對於資源,加鎖是個重要的環節。因為python原生的list,dict等,

Java執行-併發執行

執行緒池有了解嗎? 答: java.util.concurrent.ThreadPoolExecutor 類就是一個執行緒池。客戶端呼叫ThreadPoolExecutor.submit(Runnable task) 提交任務,執行緒池內部維護的工作者執行緒的數量就是該執行緒池的執行

Java執行-併發之synchronized 關鍵字

synchronized 關鍵字 答: 底層實現: 進入時,執行 monitorenter,將計數器 +1,釋放鎖 monitorexit 時,計數器 -1 當一個執行緒判斷到計數器為 0 時,則當前鎖空閒,可以佔用;反之,當前執行緒進入等待狀態 含義

Java執行-併發之sleep() 和 wait(n) 、 wait() 的區別

sleep() 和 wait(n) 、 wait() 的區別 答: sleep 方法:是 Thread 類的靜態方法,當前執行緒將睡眠 n 毫秒,執行緒進入阻塞狀態。當睡眠時間到了,會接觸阻塞,進入可執行狀態,等待 CPU 的到來。睡眠不釋放鎖(如果有的話) wai

Java執行-併發執行產生死鎖的4個必要條件?如何避免死鎖?

多執行緒產生死鎖的4個必要條件? 答: 互斥條件:一個資源每次只能被一個執行緒使用 請求與保持條件:一個執行緒因請求資源而阻塞時,對已獲得的資源保持不放 不剝奪條件:程序已經獲得的資源,在未使用完之前,不能強行剝奪 迴圈等待條件:若干執行緒之間形成一種頭

Java執行-併發執行和程序的區別

執行緒和程序的區別 答: 程序是一個“執行中的程式”,是系統進行資源分配和排程的一個獨立單位 執行緒是程序的一個實體,一個程序中擁有多個執行緒,執行緒之間共享地址空間和其他資源(所以通訊和同步等操作執行緒比程序更加容易) 執行緒上下文的切換比程序上下文切換要快

Java執行-併發之如何制定執行執行順序?

文章目錄 如何讓10個執行緒按照順序列印0123456789? 程式碼如下: 1.建立一個鎖物件類 2.建立一個執行緒類 3.測試類 如何讓10個執行緒按照順序列印012

java執行入門案例(2)之執行簡單應用

  上一篇文章:java多執行緒案例(1)之簡單銀行取款問題及其優化 我大概介紹了一下Java程式碼優化的問題,主要針對出學者而言,這一次我要介紹多執行緒應用的簡單案例 。網上有許多多執行緒的案例,但大多都挺複雜的,今天我主要目的也是介紹一下多執行緒應用的簡單案例,讓初學

java執行入門案例(1)之簡單銀行取款問題及其優化

      之前學java的時候,學校老師由於課時問題,沒有教我們多執行緒,最近學校外聘的企業教師給我們做實訓要用到多執行緒,因此才初步瞭解多執行緒,以下這個案例只是介紹java程式碼優化,程式碼中有涉及多執行緒,但今天不對其講解,只簡單講解程式碼優化的問題,希望對初學

Java執行併發總結】Thread類的常用方法(join、yield等)---執行的基礎操作篇

 啟動(start)   最基本的操作,呼叫Runnable中的run方法,無返回值。 new Thread(new Test()).start(); 休眠(sleep)  使當前執行緒休眠一段時間,預設為毫秒級,最高可以精確到納秒,呼叫的方法為slee

java 執行併發實質

首先我們都知道多執行緒在獲取共享資源時,往往會出現意想不到的結果,這是為什麼呢?執行緒獲取共享資源的過程如下圖: 首先我們需要了解jvm記憶體結構,在這裡不過多說明,由上圖我們可以知道,在jvm記憶體中分為獨立記憶體和共享記憶體,獨立記憶體是我們每個執行緒獨有的資訊,而

JAVA執行併發Demo

一個最簡單的多執行緒併發demo: 主函式: public class multithreadReq { private static final int THREADNUM = 5;/

一、Java執行併發同步之Semaphore

概念 Semaphore是一種在多執行緒環境下使用的設施,該設施負責協調各個執行緒,用來管理資源,以保證它們能夠正確、合理的使用公共資源的設施,也是作業系統中用於控制程序同步互斥的量。用我們常見的說法就是用來控制併發數。 訊號量是一個非負整數 。 業務場景 以售

Java執行/併發20、Future實現類:FutureTask

FutureTask是future的實現類,它同時實現了兩個介面:Runnable和Future,所以它既可以作為Runnable被執行緒執行,又可以作為Future得到Callable的返回值。 因此我們可以: - 呼叫FutureTask物件的ru

二、Java執行併發同步之CyclicBarrier

概述 CyclicBarrier:可迴圈屏障,允許一組執行緒全部等待的同步輔助工具。一組執行緒互相等待,直到所有執行緒都到達某個公共屏障點(也可以叫同步點) 。它可以在等待執行緒之後重新使用。這個屏障之所以用迴圈修飾,是因為在所有的執行緒釋放彼此之後,這個屏障是

java執行併發系列之閉鎖(Latch)和柵欄(CyclicBarrier)

-閉鎖(Latch) 閉鎖(Latch):一種同步方法,可以延遲執行緒的進度直到執行緒到達某個終點狀態。通俗的講就是,一個閉鎖相當於一扇大門,在大門開啟之前所有執行緒都被阻斷,一旦大門開啟所有執行緒都將通過,但是一旦大門開啟,所有執行緒都通過了,那麼這個閉鎖的狀態就失效了,門