mysql函式大批量插入資料+java多執行緒併發案例
當前需要做個測試,mysql資料庫需要5000萬資料來完成效能測試,於是我們寫了個函式來完成這個功能。
函式定義如下:
#批量插入資料庫的函式
BEGIN
#定義需要插入資料庫表中的欄位;
DECLARE id BIGINT(20);
DECLARE phone_number varchar(11);
#定義變數控制插入資料庫表的行數
DECLARE sid bigint(20) DEFAULT 1;
DECLARE count bigint(20);
declare total bigint(20) DEFAULT 50000000;
#開啟迴圈判斷
while sid < total do
SET count = sid + 100000; //每次插入資料庫條數,防止條數太多提交失敗;
#開始事務
start transaction;
WHILE (sid < count) DO //迴圈插入資料庫資料
SET id = sid;
SET phone_number = CONCAT(1, ROUND(ROUND(RAND(),10)*10000000000));
INSERT INTO t_contact_phone(sid,phone_number)
VALUES(id,phone_number);
#累計行數
SET sid = sid+1;
END WHILE;
commit; //提交事務
end while; //結束迴圈
END
執行上述函式即可建立5000萬條資料在測試庫用於測試。
當我們需要同時執行大批量操作,那如果依然順序單執行緒執行,會比較耗時,為了提高效率,我們需要通過多執行緒非同步方式來提高效率;
比方說大批量資料的同步方案,只能通過批量查詢+多執行緒+非同步的方式來完成。才能更短的時間內更高效的完成任務。
public class SynchronizationData {
private static final Logger logger = LoggerFactory.getLogger(SynchronizationData.class); @Autowired private ApplicationSumaryMapper applicationSumaryMapper;//需要同步的資料表(mybatis完成) @Autowired private ContactPhoneMapper contactPhoneMapper;//需要插入的資料表一(mybatis完成) @Autowired private MiddleFieldMapper middleFieldMapper; //需要插入的資料表二(mybatis完成) private static ExecutorService executor = Executors.newFixedThreadPool(2); //定義執行緒池用於多執行緒執行。 //需要同步總條數 private long totalNumber = 0; //需要同步的最大進件id; private long maxSid = 0; //當前已經同步的sid private long currentSid = 0; private long currentPage = 0; private long maxPage = 0; public void startSynData() { //獲取當前系統中最大的sid maxSid = applicationSumaryMapper.getMaxSid(); logger.info("需要同步的最大進件id(SID)為:", maxSid); //根據sid獲取總條數 totalNumber = applicationSumaryMapper.getTotalNumber(maxSid); logger.info("需要同步的進件總數為:", totalNumber); maxPage = totalNumber / 1000 + 1; logger.info("最大頁數為:", maxPage); //判斷能否開始執行,執行時間為晚上12點到早上8點 boolean synStart = isSynStart(currentSid, maxSid); if (synStart) { //開始分頁查詢 for (long i = 0; i <= maxPage;) { List<ContactPhone> contactList = new ArrayList<ContactPhone>(); List<MiddleField> middleList = new ArrayList<MiddleField>(); currentPage = i;
//查詢需要同步的資料集 List<ApplicationSumary> synDataList = applicationSumaryMapper.getSynData(currentPage, currentSid, maxSid); //遍歷集合組裝資料 prepareDataForBatchInsert(contactList, middleList, synDataList); i = batchAddData(i, contactList, middleList, synDataList); } } else { //不符合同步條件,啥也不做 } } @Transactional //通過事務控制同成功,同失敗。 private long batchAddData(long i, List<ContactPhone> contactList, List<MiddleField> middleList, List<ApplicationSumary> synDataList) { //非同步批量插入contactList Future<Integer> contactNumber = executor.submit(new contactPhoneCallable(contactList)); //非同步批量插入middleList Future<Integer> middleNumber = executor.submit(new middleFiledCallable(middleList)); try { Integer number1 = contactNumber.get(); Integer number2 = middleNumber.get(); //如果都成功則繼續 if (number1 != -1 && number2 != -1) { i++; } else { //失敗則繼續當前頁執行 } } catch (Exception e) { e.printStackTrace(); //異常並記錄當前的sid logger.info("異常最小的sid為:"+synDataList.get(0).getSid()); logger.info("異常最大的sid為:"+synDataList.get(synDataList.size()-1).getSid()); } return i; } //遍歷要同步的資料集合組裝要批量插入的資料集合 private void prepareDataForBatchInsert(List<ContactPhone> contactList, List<MiddleField> middleList, List<ApplicationSumary> synDataList) { for (ApplicationSumary application : synDataList ) { MiddleField middleField = new MiddleField(application.getSid(), application.getUnitAddressCode(), application.getCustunitAddress(), application.getRegistAddressCode(), application.getRegistAddress() , application.getLiveAddressCode(), application.getLiveAddress(), application.getCustQq(), application.getCustWechatcode(), application.getAppStatus(), application.getAppStatusCode()); middleList.add(middleField); //客戶電話號 if (application.getPhoneNubmer() != null) { ContactPhone phoneForSelf = new ContactPhone(); contactList.add(phoneForSelf); }; //銷售人員電話 if (application.getSalesPhone() != null) { ContactPhone phoneForSales = new ContactPhone(); contactList.add(phoneForSales); }; //家庭聯絡人電話 if (application.getFamilyPhone() != null) { ContactPhone phoneForFamily = new ContactPhone(); contactList.add(phoneForFamily); }; //緊急聯絡人電話 if (application.getOtherContactphone() != null) { ContactPhone phoneForOther = new ContactPhone(); contactList.add(phoneForOther); }; //工作證明人電話 if (application.getFalyPhone() != null) { ContactPhone phoneForFaly = new ContactPhone(); contactList.add(phoneForFaly); }; } } //判斷是否可以開始同步,這裡只允許晚上0點到早上八點允許執行 private boolean isSynStart(long currentSid, long maxSid) { Date date = new Date(); int hours = date.getHours(); if ((hours >= 0 || hours <= 7) && currentSid < maxSid) { return true; } else { return false; } }
//多執行緒執行任務一:給聯絡人表中批量插入資料 class contactPhoneCallable implements Callable<Integer>{ private List<ContactPhone> runList; public contactPhoneCallable(List<ContactPhone> runList) { this.runList = runList; } @Override public Integer call() { try { int addNumber = contactPhoneMapper.batchAdd(runList); return addNumber; } catch (Exception e) { return -1; } } } //多執行緒執行的任務二;給中間資訊表中批量插入資料 class middleFiledCallable implements Callable<Integer>{ private List<MiddleField> runList; public middleFiledCallable(List<MiddleField> runList) { this.runList = runList; } @Override public Integer call() { try { int middleNumber = middleFieldMapper.batchAdd(runList); return middleNumber; } catch (Exception e) { return -1; } } }
}
最後給出分頁查詢和批量插入的sql語句:
mybatis定義的方法:
List<ApplicationSumary> getSynData(@Param("currentPage") long currentPage, @Param("currentSid") long currentSid, @Param("maxSid") long maxSid);
分頁查詢sql:
<select id="getSynData" resultMap="BaseResultMap"> select <include refid="Base_Column_List" /> from t_application_sumary where sid<![CDATA[ <= ]]>#{maxSid} and sid> #{currentSid} limit #{currentPage},#{currentPage+1000} </select>
mybatis定義的方法:
int batchAdd(List<ContactPhone> contactList);
批量插入sql:
<insert id="batchAdd" parameterType="java.util.List"> insert into t_contact_phone (sid,phone_type,phone_number,phone_name) values <foreach collection="list" item="item" index="index" separator=","> (#{item.sid,jdbcType=BIGINT}, #{item.phoneType,jdbcType=VARCHAR}, #{item.phoneNumber,jdbcType=char}, #{item.phoneName,jdbcType=VARCHAR}) </foreach> </insert>
相關推薦
mysql函式大批量插入資料+java多執行緒併發案例
當前需要做個測試,mysql資料庫需要5000萬資料來完成效能測試,於是我們寫了個函式來完成這個功能。 函式定義如下: #批量插入資料庫的函式 BEGIN #定義需要插入資料庫表中的欄位; DECLARE id BIGINT(20); DECLA
Java多執行緒/併發25、Exchanger執行緒資料交換
Exchanger用於執行緒間的資料交換。它提供一個同步點,在這個同步點兩個執行緒可以交換彼此的資料。 這句話說到兩個關鍵點: Exchanger只能用於兩個執行緒互相交換資料。如果有三個執行緒呢?對不起,臣妾做不到…… Exchanger會產生一個同步點
Java多執行緒/併發05、synchronized應用例項:執行緒間操作共享資料
電商平臺中最重要的一點就是賣東西。同個商品不能無限制的賣下去的,因為商品有庫存量,超過庫存就不能賣了。 這裡,約定一個規則,下單使庫存減n,取消訂單使庫存加m。庫存數量不可以小於0。 假設平臺上同時有很多使用者在操作,在不考慮效率的情況下,我們用同步方法來模
java多執行緒併發庫高階應用 之 執行緒範圍內共享資料
轉自:http://blog.csdn.net/xushuaic/article/category/1335611 筆記摘要: 所謂執行緒範圍內共享資料,即對於相同的程式程式碼,多個模組在同一個執行緒中執行時要共享一份資料,而在另外執行緒中執行時又共
非結構化資料與結構化資料提取---多執行緒爬蟲案例
多執行緒糗事百科案例 案例要求參考上一個糗事百科單程序案例 Queue(佇列物件) Queue是python中的標準庫,可以直接import Queue引用;佇列是執行緒間最常用的交換資料的形式 python下多執行緒的思考 對於資源,加鎖是個重要的環節。因為python原生的list,dict等,
Java多執行緒-併發之執行緒池
執行緒池有了解嗎? 答: java.util.concurrent.ThreadPoolExecutor 類就是一個執行緒池。客戶端呼叫ThreadPoolExecutor.submit(Runnable task) 提交任務,執行緒池內部維護的工作者執行緒的數量就是該執行緒池的執行
Java多執行緒-併發之synchronized 關鍵字
synchronized 關鍵字 答: 底層實現: 進入時,執行 monitorenter,將計數器 +1,釋放鎖 monitorexit 時,計數器 -1 當一個執行緒判斷到計數器為 0 時,則當前鎖空閒,可以佔用;反之,當前執行緒進入等待狀態 含義
Java多執行緒-併發之sleep() 和 wait(n) 、 wait() 的區別
sleep() 和 wait(n) 、 wait() 的區別 答: sleep 方法:是 Thread 類的靜態方法,當前執行緒將睡眠 n 毫秒,執行緒進入阻塞狀態。當睡眠時間到了,會接觸阻塞,進入可執行狀態,等待 CPU 的到來。睡眠不釋放鎖(如果有的話) wai
Java多執行緒-併發之多執行緒產生死鎖的4個必要條件?如何避免死鎖?
多執行緒產生死鎖的4個必要條件? 答: 互斥條件:一個資源每次只能被一個執行緒使用 請求與保持條件:一個執行緒因請求資源而阻塞時,對已獲得的資源保持不放 不剝奪條件:程序已經獲得的資源,在未使用完之前,不能強行剝奪 迴圈等待條件:若干執行緒之間形成一種頭
Java多執行緒-併發之執行緒和程序的區別
執行緒和程序的區別 答: 程序是一個“執行中的程式”,是系統進行資源分配和排程的一個獨立單位 執行緒是程序的一個實體,一個程序中擁有多個執行緒,執行緒之間共享地址空間和其他資源(所以通訊和同步等操作執行緒比程序更加容易) 執行緒上下文的切換比程序上下文切換要快
Java多執行緒-併發之如何制定多個執行緒的執行順序?
文章目錄 如何讓10個執行緒按照順序列印0123456789? 程式碼如下: 1.建立一個鎖物件類 2.建立一個執行緒類 3.測試類 如何讓10個執行緒按照順序列印012
java多執行緒入門案例(2)之多執行緒簡單應用
上一篇文章:java多執行緒案例(1)之簡單銀行取款問題及其優化 我大概介紹了一下Java程式碼優化的問題,主要針對出學者而言,這一次我要介紹多執行緒應用的簡單案例 。網上有許多多執行緒的案例,但大多都挺複雜的,今天我主要目的也是介紹一下多執行緒應用的簡單案例,讓初學
java多執行緒入門案例(1)之簡單銀行取款問題及其優化
之前學java的時候,學校老師由於課時問題,沒有教我們多執行緒,最近學校外聘的企業教師給我們做實訓要用到多執行緒,因此才初步瞭解多執行緒,以下這個案例只是介紹java程式碼優化,程式碼中有涉及多執行緒,但今天不對其講解,只簡單講解程式碼優化的問題,希望對初學
【Java多執行緒併發總結】Thread類的常用方法(join、yield等)---執行緒的基礎操作篇
啟動(start) 最基本的操作,呼叫Runnable中的run方法,無返回值。 new Thread(new Test()).start(); 休眠(sleep) 使當前執行緒休眠一段時間,預設為毫秒級,最高可以精確到納秒,呼叫的方法為slee
java 多執行緒併發實質
首先我們都知道多執行緒在獲取共享資源時,往往會出現意想不到的結果,這是為什麼呢?執行緒獲取共享資源的過程如下圖: 首先我們需要了解jvm記憶體結構,在這裡不過多說明,由上圖我們可以知道,在jvm記憶體中分為獨立記憶體和共享記憶體,獨立記憶體是我們每個執行緒獨有的資訊,而
JAVA多執行緒併發Demo
一個最簡單的多執行緒併發demo: 主函式: public class multithreadReq { private static final int THREADNUM = 5;/
一、Java多執行緒併發同步之Semaphore
概念 Semaphore是一種在多執行緒環境下使用的設施,該設施負責協調各個執行緒,用來管理資源,以保證它們能夠正確、合理的使用公共資源的設施,也是作業系統中用於控制程序同步互斥的量。用我們常見的說法就是用來控制併發數。 訊號量是一個非負整數 。 業務場景 以售
Java多執行緒/併發20、Future實現類:FutureTask
FutureTask是future的實現類,它同時實現了兩個介面:Runnable和Future,所以它既可以作為Runnable被執行緒執行,又可以作為Future得到Callable的返回值。 因此我們可以: - 呼叫FutureTask物件的ru
二、Java多執行緒併發同步之CyclicBarrier
概述 CyclicBarrier:可迴圈屏障,允許一組執行緒全部等待的同步輔助工具。一組執行緒互相等待,直到所有執行緒都到達某個公共屏障點(也可以叫同步點) 。它可以在等待執行緒之後重新使用。這個屏障之所以用迴圈修飾,是因為在所有的執行緒釋放彼此之後,這個屏障是
java多執行緒併發系列之閉鎖(Latch)和柵欄(CyclicBarrier)
-閉鎖(Latch) 閉鎖(Latch):一種同步方法,可以延遲執行緒的進度直到執行緒到達某個終點狀態。通俗的講就是,一個閉鎖相當於一扇大門,在大門開啟之前所有執行緒都被阻斷,一旦大門開啟所有執行緒都將通過,但是一旦大門開啟,所有執行緒都通過了,那麼這個閉鎖的狀態就失效了,門