1. 程式人生 > >高併發程式設計之高併發場景:秒殺(無鎖、排他鎖、樂觀鎖、redis快取的逐步演變)

高併發程式設計之高併發場景:秒殺(無鎖、排他鎖、樂觀鎖、redis快取的逐步演變)

環境:

jdk1.8;spring boot2.0.2;Maven3.3

摘要說明:

在實際開發過程中往往會出現許多高併發場場景,秒殺,強紅包,搶優惠卷等;

其中:

秒殺場景的特點就是單位時間湧入使用者量極大,商品數少,且要保證不可超量銷售;

秒殺產品的本質就是減庫存;

秒殺場景常用的解決方案有限流、削峰、拓展等

本篇以秒殺場景為依據來主要從程式碼開發的角度闡述從無鎖——》排他鎖——》共享鎖——》快取中介軟體的一步步升級來不斷完善及優化;同時也針對整體架構提出一些優化方案;

步驟:

1.準備高併發測試工具類

引入高併發程式設計的工具類:java.util.concurrent.CountDownLatch(發令槍)來進行模擬大批量使用者高併發測試;

java.util.concurrent.CountDownLatch(發令槍):一個同步輔助類,控制一組執行緒的啟動,當一組執行緒未完全準備好之前控制準備好一個或多個執行緒一直等待。猶如倒計時計數器,呼叫CountDownLatch物件的countDown方法就將計數器減1,當計數到達0時,則意味著這組執行緒完全準備好。此時通知所有等待者即整組執行緒同時開始執行。

package com.example.demo_20180925;

import java.util.Map;
import java.util.concurrent.CountDownLatch;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.test.context.junit4.SpringRunner;

import com.example.demo_20180925.pojo.ProductInfo;
import com.example.demo_20180925.service.ProductInfoService;

@RunWith(SpringRunner.class)
// 引入SpringBootTest並生成隨機介面
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
public class Demo20180925ApplicationTests {

	// 商品程式碼
	private static final String CODE = "IPONE XR";
	// 商品總數
	private static final Long PRODUCTCOUNT = (long) 1000;
	// 併發人數
	private static final int USER_NUM = 1000;
	// 發令槍;用於模擬高併發
	private static CountDownLatch countDownLatch = new CountDownLatch(USER_NUM);
	// 計數器,用於記錄成功購買客戶人數
	private static int successPerson = 0;
	// 計數器,用於記錄賣出去對的商品個數
	private static int saleOutNum = 0;
	// 計數器,用於記錄處理總時間
	private static long doTime = 0;
	// 計數器,用於記錄處理最長時間
	private static long maxTime = 0;
	@Autowired
	ProductInfoService productInfoService;

	@Before
	public void init() {
		// 初始化庫存
		ProductInfo productInfo = new ProductInfo();
		productInfo.setProductCode(CODE);
		productInfo.setProductCount(PRODUCTCOUNT);
		this.productInfoService.updateFirst(productInfo);
	}

	@Test
	public void testSeckill() throws InterruptedException {
		// 迴圈初始換USER_NUM個請求例項
		for (int i = 0; i < USER_NUM; i++) {
			new Thread(new BuyProduct(CODE, (long) 3)).start();
			if (i == USER_NUM) {
				Thread.currentThread().sleep(2000);// 最後一個子執行緒時休眠兩秒等待所有子執行緒全部準備好
			}
			countDownLatch.countDown();// 發令槍減1,到0時啟動發令槍
		}
		Thread.currentThread().sleep(30 * 1000);// 主執行緒休眠10秒等待結果
		// Thread.currentThread().join();
		System.out.println("購買成功人數:" + successPerson);
		System.out.println("銷售成功個數:" + saleOutNum);
		System.out.println("剩餘個數:"
				+ productInfoService.selectByCode(CODE).getProductCount());
		System.out.println("處理時間:" + doTime);
	}

	public class BuyProduct implements Runnable {
		private String code;
		private Long buys;

		public BuyProduct(String code, Long buys) {
			this.code = code;
			this.buys = buys;
		}

		public void run() {
			try {
				countDownLatch.await();// 所有子執行緒執行到這裡休眠,等待發令槍指令
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			// 直接減庫存
			// Map<String, Object> map = productInfoService.update(code, buys);

			// 加排他鎖(悲觀鎖)後減庫存
			Map<String, Object> map = productInfoService.selectForUpdate(code,
					buys);

			// 根據版本號加樂觀鎖減庫存
			// Map<String, Object> map =
			// productInfoService.updateByVersion(code,
			// buys);

			// 根據庫存加樂觀鎖減庫存
			// Map<String, Object> map = productInfoService.updateByBuys(code,
			// buys);

			// 根據快取減庫存
			// Map<String, Object> map = productInfoService.updateByRedis(code,
			// buys);
			if ((boolean) map.get("result")) {
				synchronized (countDownLatch) {
					// 更新庫存成功,修改購買成功人數及銷售產品數量
					successPerson++;
					// 記錄總購買成功人數
					saleOutNum = (int) (saleOutNum + buys);
					// 記錄總消費時間
					doTime = doTime + (long) map.get("time");
					// 記錄最大時間
					if (maxTime < (long) map.get("time")) {
						maxTime = (long) map.get("time");
					}
				}
			}
		}
	}
}

2.無鎖開發

許多開發對高併發程式設計及資料庫鎖的機制不是很理解,開發時就很簡單的查詢庫存然後更新庫存即select——》update;此時在高併發的環境下測試就會出現產品過多銷售的情況;

@Service
public class ProductInfoServiceImpl implements ProductInfoService {
	@Autowired
	ProductInfoMapper productInfoMapper;
	@Autowired
	RedisTemplate<String, Object> redisTemplate;
	private static long threadCount = 0;

	/**
	 * 根據產品程式碼查詢產品
	 */
	@Override
	public ProductInfo selectByCode(String code) {
		return productInfoMapper.findByCode(code);
	}

	/**
	 * 初始化庫存
	 */
	@Override
	public boolean updateFirst(ProductInfo productInfo) {
		redisTemplate.opsForHash().put("productCount",
				productInfo.getProductCode(), productInfo.getProductCount());
		return productInfoMapper.updateForFirst(productInfo.getProductCode(),
				productInfo.getProductCount()) > 0 ? true : false;
	}

	/**
	 * 直接減庫存
	 */
	@Override
	public Map<String, Object> update(String code, Long buys) {
		threadCount++;
		System.out.println("開啟執行緒:" + threadCount);
		Date date = new Date();
		Map<String, Object> map = new HashMap<String, Object>();
		ProductInfo productInfo = productInfoMapper.findByCode(code);
		if (productInfo.getProductCount() < buys) {
			map.put("result", false);
			Date date1 = new Date();
			map.put("time", date1.getTime() - date.getTime());
			return map;
		}
		map.put("result", productInfoMapper.update(code, buys) > 0 ? true
				: false);
		Date date1 = new Date();
		map.put("time", date1.getTime() - date.getTime());
		return map;
	}
}

我們執行下可以看到結果:

1000個人,每個人購買3個商品,商品總數1000個,理論上購買成功人數應該為333,商品銷售成功個數應該為999;

但實際購買成功人數560,銷售商品個數為1680;遠遠的過度銷售;

開啟執行緒:993
開啟執行緒:991
開啟執行緒:996
開啟執行緒:995
開啟執行緒:997
開啟執行緒:998
開啟執行緒:999
開啟執行緒:1000
購買成功人數:560
銷售成功個數:1680
剩餘個數:-680
處理時間:2424240
最大處理時間:3220

2.鎖的分類

資料庫鎖的機制從不同角度出發可進行不同的分類:

顆粒度上可劃分(基於mysql):

  • 表級鎖:表級鎖是MySQL中鎖定粒度最大的一種鎖,表示對當前操作的整張表加鎖,它實現簡單,資源消耗較少,被大部分MySQL引擎支援;特點:開銷小,加鎖快;不會出現死鎖;鎖定粒度大,發出鎖衝突的概率最高,併發度最低。
  • 行級鎖:行級鎖是Mysql中鎖定粒度最細的一種鎖,表示只針對當前操作的行進行加鎖。行級鎖能大大減少資料庫操作的衝突。其加鎖粒度最小,但加鎖的開銷也最大。特點:開銷大,加鎖慢;會出現死鎖;鎖定粒度最小,發生鎖衝突的概率最低,併發度也最高。
  • 頁級鎖:頁級鎖是MySQL中鎖定粒度介於行級鎖和表級鎖中間的一種鎖。表級鎖速度快,但衝突多,行級衝突少,但速度慢。所以取了折衷的頁級,一次鎖定相鄰的一組記錄。BDB支援頁級鎖;特點:開銷和加鎖時間界於表鎖和行鎖之間;會出現死鎖;鎖定粒度界於表鎖和行鎖之間,併發度一般

級別上進行劃分:

  • 共享鎖:又稱讀鎖,是讀取操作建立的鎖。其他使用者可以併發讀取資料,但任何事務都不能對資料進行修改(獲取資料上的排他鎖),直到已釋放所有共享鎖。如果事務T對資料A加上共享鎖後,則其他事務只能對A再加共享鎖,不能加排他鎖。獲准共享鎖的事務只能讀資料,不能修改資料。
  • 排他鎖:又稱寫鎖,如果事務T對資料A加上排他鎖後,則其他事務不能再對A加任任何型別的封鎖。獲准排他鎖的事務既能讀資料,又能修改資料。

使用方式上進行劃分:

  • 樂觀鎖:是一種併發控制的方法。它假設多使用者併發的事務在處理時不會彼此互相影響,各事務能夠在不產生鎖的情況下處理各自影響的那部分資料。在提交資料更新之前,每個事務會先檢查在該事務讀取資料後,有沒有其他事務又修改了該資料。如果其他事務有更新的話,正在提交的事務會進行回滾
  • 悲觀鎖:是一種併發控制的方法。它可以阻止一個事務以影響其他使用者的方式來修改資料。如果一個事務執行的操作都某行資料應用了鎖,那只有當這個事務把鎖釋放,其他事務才能夠執行與該鎖衝突的操作。悲觀併發控制主要用於資料爭用激烈的環境,以及發生併發衝突時使用鎖保護資料的成本要低於回滾事務的成本的環境中。

操作上進行劃分:

  • DML鎖:用於保護資料的完整性,其中包括行級鎖(Row Locks (TX鎖))、表級鎖(table lock(TM鎖))。
  • DDL鎖:用於保護資料庫物件的結構,如表、索引等的結構定義。其中包排他DDL鎖(Exclusive DDL lock)、共享DDL鎖(Share DDL lock)、可中斷解析鎖(Breakable parse locks)

3.排他鎖(悲觀鎖)開發

通過上面對鎖的機制介紹之後我們可以看到,排他鎖可以很好的解決我們上面商品多銷售的問題;排他鎖的本質即排隊執行;

mysql的排他鎖的用法為:SELECT ... FOR UPDATE;

在查詢語句後面增加FOR UPDATE,Mysql會對查詢結果中的每行都加排他鎖,當沒有其他執行緒對查詢結果集中的任何一行使用排他鎖時,可以成功申請排他鎖,否則會被阻塞。所以它本質上也是一個行級鎖;

	/**
	 * 根據產品程式碼查詢產品資訊;排他鎖
	 * 
	 * @param code
	 *            產品程式碼
	 * @return
	 */
	@Select("SELECT id,version,product_code as productCode,product_name as productName, product_count AS productCount FROM product_info WHERE product_code = #{code} for update")
	ProductInfo selectForUpdate(@Param("code") String code);

業務層:

	/**
	 * 加排他鎖減庫存
	 */
	@Transactional
	@Override
	public Map<String, Object> selectForUpdate(String code, Long buys) {
		threadCount++;
		System.out.println("開啟執行緒:" + threadCount);
		Date date = new Date();
		Map<String, Object> map = new HashMap<String, Object>();
		ProductInfo productInfo = productInfoMapper.selectForUpdate(code);
		if (productInfo.getProductCount() < buys) {
			map.put("result", false);
			Date date1 = new Date();
			map.put("time", date1.getTime() - date.getTime());
			return map;
		}
		map.put("result", productInfoMapper.update(code, buys) > 0 ? true
				: false);
		Date date1 = new Date();
		map.put("time", date1.getTime() - date.getTime());
		return map;
	}

執行結果:

開啟執行緒:979
開啟執行緒:980
開啟執行緒:981
開啟執行緒:982
開啟執行緒:983
開啟執行緒:984
購買成功人數:333
銷售成功個數:999
剩餘個數:1
處理時間:4101
最大處理時間:160

結果可以看到排他鎖可以很好的控制住商品數量的銷售;但排他鎖的本質是排隊,如果業務複雜或者併發人數過多的情況下會產生超時現象;

4.樂觀鎖開發

樂觀鎖的本質上並沒有使用資料庫本身的鎖機制;只是在提交的那一刻通過sql查詢條件來約束更新;

常規的是樂觀鎖一般有兩種:

一種是破壞表的業務介面新增版本號(version)或者時間戳(timestamp );

一種是使用業務本身做約束;

版本號形式共享鎖:

/**
	 * 根據購買數量及版本號減少庫存
	 * 
	 * @param code
	 *            產品程式碼
	 * @param buys
	 *            購買數量
	 * @param version
	 *            版本資訊
	 * @return
	 */
	@Update("update product_info SET product_count=product_count - #{buys},version=version+1 WHERE product_code = #{code} and version = #{version}")
	int updateByVersion(@Param("code") String code, @Param("buys") Long buys,
			@Param("version") Long version);

業務層:共享鎖當約束條件不滿足之後,需要在巢狀呼叫,直到滿足條件或商品銷售成功才停止;

	/**
	 * 根據版本號加樂觀鎖減庫存
	 */
	@Override
	public Map<String, Object> updateByVersion(String code, Long buys) {
		Date date = new Date();
		Map<String, Object> map = new HashMap<String, Object>();
		try {
			threadCount++;
			System.out.println("開啟執行緒:" + threadCount);
			ProductInfo productInfo = productInfoMapper.findByCode(code);
			if (productInfo.getProductCount() < buys) {
				map.put("result", false);
				Date date1 = new Date();
				map.put("time", date1.getTime() - date.getTime());
				return map;
			}
			if (productInfoMapper.updateByVersion(code, buys,
					productInfo.getVersion()) > 0 ? true : false) {
				map.put("result", true);
				Date date1 = new Date();
				map.put("time", date1.getTime() - date.getTime());
				return map;
			}
			waitForLock();
			return updateByVersion(code, buys);
		} catch (Exception e) {
			System.err.println(e);
			map.put("result", false);
			Date date1 = new Date();
			map.put("time", date1.getTime() - date.getTime());
			return map;
		}

	}
    // 錯峰執行
	private void waitForLock() {
		try {
			Thread.sleep(new Random().nextInt(10) + 1);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

執行結果:

開啟執行緒:1612
開啟執行緒:1613
開啟執行緒:1614
開啟執行緒:1615
開啟執行緒:1616
購買成功人數:333
銷售成功個數:999
剩餘個數:1
處理時間:428636
最大處理時間:3215

結果可以看到樂觀鎖鎖可以很好的控制住商品數量的銷售;但我們也可以看到樂觀鎖會導致執行緒的迴圈執行,若業務要求先到先得的話一定程度上是不滿足的;

通過業務銷量來實現共享鎖:

	/**
	 * 根據購買數量及剩餘庫存減少庫存
	 * 
	 * @param code
	 *            產品程式碼
	 * @param buys
	 *            購買數量
	 * @return
	 */
	@Update("update product_info SET product_count=product_count - #{buys} WHERE product_code = #{code} and (product_count - #{buys})>0")
	int updateByBuys(@Param("code") String code, @Param("buys") Long buys);

業務層:由於我們使用商品數量本身作為約束;故不需要做巢狀呼叫

	/**
	 * 根據庫存加樂觀鎖減庫存
	 */
	@Override
	public Map<String, Object> updateByBuys(String code, Long buys) {
		threadCount++;
		System.out.println("開啟執行緒:" + threadCount);
		Date date = new Date();
		Map<String, Object> map = new HashMap<String, Object>();
		try {
			ProductInfo productInfo = productInfoMapper.findByCode(code);
			if (productInfo.getProductCount() < buys) {
				map.put("result", false);
				Date date1 = new Date();
				map.put("time", date1.getTime() - date.getTime());
				return map;
			}
			if (productInfoMapper.updateByBuys(code, buys) > 0 ? true : false) {
				map.put("result", true);
			}else{
				map.put("result", false);
			}
			Date date1 = new Date();
			map.put("time", date1.getTime() - date.getTime());
			return map;
//			waitForLock();
//			return updateByBuys(code, buys);
		} catch (Exception e) {
			System.err.println(e);
			map.put("result", false);
			Date date1 = new Date();
			map.put("time", date1.getTime() - date.getTime());
			return map;

		}
	}

執行結果:

開啟執行緒:456
購買成功人數:333
銷售成功個數:999
剩餘個數:1
處理時間:487387
最大處理時間:2759

5.快取中介軟體

上述的使用資料庫的兩種鎖機制是可以很好的解決問題;但若是我們不斷增加併發數,就可以看到對資料庫會造成很大的壓力;實際生產環境,資料庫本身的資源壓力就很大;在這種關鍵入口處最好引入快取資料庫來過濾請求限流減少資料庫壓力;

本篇使用redis快取,其中redis快取和spring boot的集合這裡就不贅述,請自行檢視原始碼;

這裡使用的是redis資料庫的Hash型別中的Redis Hincrby 命令:用於為雜湊表中的欄位值加上指定增量值,增量值可為負。

HINCRBY KEY_NAME FIELD_NAME INCR_BY_NUMBER 
	@Autowired
	RedisTemplate<String, Object> redisTemplate;    

	@Override
	public Map<String, Object> updateByRedis(String code, Long buys) {
		threadCount++;
		System.out.println("開啟執行緒:" + threadCount);
		Date date = new Date();
		Map<String, Object> map = new HashMap<String, Object>();
		long count = Long.valueOf(redisTemplate.opsForHash().get(
				"productCount", code)
				+ "");
		if (count > 0) {
			count = Long.valueOf(redisTemplate.opsForHash().increment(
					"productCount", code, -buys));
			if (count >= 0) {
				map.put("result", true);
				Date date1 = new Date();
				map.put("time", date1.getTime() - date.getTime());
				return map;
			} else {
				map.put("result", false);
				Date date1 = new Date();
				map.put("time", date1.getTime() - date.getTime());
				return map;
			}
		} else {
			map.put("result", false);
			Date date1 = new Date();
			map.put("time", date1.getTime() - date.getTime());
			return map;
		}
	}

執行結果:

開啟執行緒:926
開啟執行緒:924
開啟執行緒:923
購買成功人數:333
銷售成功個數:999
剩餘個數:1000
處理時間:69702
最大處理時間:462

6.拓展

上述程式碼開發的演變不可能就此解決高併發帶給體統的壓力;這裡就闡述下從整體架構上去提高服務的併發能力:

展現層

禁止重複提交:使用者提交之後按鈕置灰,禁止重複提交 
使用者限流:在某一時間段內只允許使用者提交一次請求

代理層

動靜分離:將所有靜態資源放在apache httpd或者nginx服務下;減輕後端服務壓力;本身處理靜態資源能裡也大於tomcat;

頁面壓縮、快取:針對靜態頁面設定壓縮機制及快取機制,減少流量峰值;

服務層

服務拆分及部署:系統拆分多個服務減少耦合度,同時達到熱點隔離及程序隔離的效果;

叢集部署:服務拆分後可根據每個服務的併發量進行橫向拓展;同時也達到叢集隔離的效果;

程式碼開發:樂觀鎖+快取

中介軟體層

快取中介軟體:通常讀多寫少的場景及集中寫入的都可以使用快取中介軟體減少資料庫壓力

訊息中介軟體:通過訊息中介軟體可以將併發任務插入佇列分批執行

資料層

資料庫叢集:通過叢集來提高資料庫併發能力

讀寫分離:通過讀寫分離來分擔資料庫壓力

7.原始碼