1. 程式人生 > >redis分散式鎖java實現解決快取雪崩

redis分散式鎖java實現解決快取雪崩

快取雪崩:因為快取失效(key生存時間到期)導致所有請求都去查詢資料庫,導致資料庫CPU和記憶體負載過高導致宕機。

快取雪崩原因及解決方案:


使用快取主要解決資料同步,並減少對資料庫訪問次數。因此,通常解決方案往往是使用互斥鎖,讓一個執行緒訪問資料庫,並將資料更新到快取中,其他執行緒訪問快取中資料。如果是基於jvm鎖機制的話,只能解決單機問題,也就是隻讓本機一個執行緒訪問快取,但是分散式條件下是不能使用的。所以,要基於快取的分散式鎖來實現。

以redis為例解釋下實現分散式鎖的原理:

獲取鎖:

所有執行緒操作一個共同的key比如:lock,如果redis中不存在key為lock的值,那麼當前執行緒獲取鎖併為lock設定一個隨機值。

如果lock已經存在了,說明已經有執行緒獲取鎖,該執行緒不能再獲取了。

釋放鎖:

獲取鎖的執行緒操作執行完畢後,清除lock的值,這樣鎖就釋放了。所以,對鎖的操作就是通過對同一個key值的新增和刪除操作。

程式碼:

@Service
public class RedisLock implements Lock {
	@Autowired
	private JedisConnectionFactory factory;
	
	private static final String LOCK="lock";
	
	private ThreadLocal<String> local=new ThreadLocal<String>();

	//獲取鎖
	@Override
	public boolean tryLock() {
		//獲取Jedis的原始資料連線
		Jedis jedis = (Jedis)factory.getConnection().getNativeConnection();
		String uuid = UUID.randomUUID().toString();
		/** 獲取鎖:設定一個隨機值,超期時間1s
			String key, String value, String nxxx, String expx, int time)
			nxxx: NX:key不存在時設值     XX:key存在時設值
			expx: EX|PX, expire time units: EX = seconds; PX = milliseconds
		 */
		String ret = jedis.set(LOCK, uuid, "NX", "PX", 1000);
		if(!StringUtils.isEmpty(ret)&&ret.equals("OK")){
			local.set(uuid);
			return true;
		}
		return false;
	}


	/**
	 * 解鎖
	 */
	@Override
	public void unlock() {
		String script=null;
		try {
			script=FileCopyUtils.copyToString(new FileReader(ResourceUtils.getFile("classpath:cn/rjx/spring/cache/unlock.c")));
		} catch (IOException e) {
			e.printStackTrace();
		}
		Jedis jedis = (Jedis)factory.getConnection().getNativeConnection();
		List<String> keys=new ArrayList<String>();
		keys.add(LOCK);
		List<String> args=new ArrayList<String>();
		args.add(local.get());
		//如果redis中的 lock值和當前執行緒的uuid值相等,刪除Key值
		jedis.eval(script, keys, args);
	}

}

刪除鍵值是執行的指令碼unlock.c:
if redis.call("get",KEYS[1])==ARGV[1] then
	return redis.call("del",KEYS[1])
else
	return 0
end


操作快取的具體流程:

1.當執行緒查詢某一值時先檢視快取是否存在該值。

2.如果存在直接返回主快取中的資料。

3.1如果不存在,只有一個執行緒獲取鎖並去資料庫讀取資料,讀取後更新主快取和備份快取。

3.2 其他執行緒取備份快取中的資料。.

程式碼實現:初始時,主快取和備份快取為空,此時可能會有執行緒獲取的值為空,但是並不影響使用者體驗,使用者可以再重新整理一次。在要求比較高的場景裡面,可以考慮先把資料寫入快取中,可以搭配定時重新整理快取的機制。

public List<Integer> queryCountByLeiMu() {
		List<Integer> cacheResult = cacheService.cacheResult("101", "leimu");
		if(cacheResult!=null){
			logger.info("================get cache=======================");
			return cacheResult;
		}
		if(lock.tryLock()){
			logger.info("================get db=======================");
			List<Integer> list=empDao.queryCountByLeiMu();
			cacheService.cachePut("101", list, "leimu");//主快取
			cacheService.cachePut("beifen101", list, "beifenleimu");//備份快取
			lock.unlock();
			return list;
		}else{
			logger.info("================get BEIFEN=======================");
			//備份中拿
			return cacheService.cacheResult("beifen101", "beifenleimu");
		}
		
	}

資料同步問題:主快取中key的過期時間比較短,這樣保證儘可能獲取新資料。

bean.xml中快取失效時間設定:

   <!-- 開啟快取註解掃描 -->
  <cache:annotation-driven />
  
   <bean id="cacheManager" class="org.springframework.data.redis.cache.RedisCacheManager">
   		<constructor-arg index="0" ref="redisTemplate"></constructor-arg>
   		<property name="expires">
   			<map>
   				<entry key="leimu" value="5"></entry>
   				<entry key="beifenleimu" value="100"></entry>
   			</map>
   		</property>
   </bean> 

測試方法模擬高併發情景:
@Autowired
	LeiMuService leiMuService;
	private static final int threadNum=13;
	//倒計數器(發令槍)   用於製造執行緒併發執行
	private static CountDownLatch cdl=new CountDownLatch(threadNum);
	
	/**
	 * 模擬高併發條件下,資料庫查詢耗時比較長
	 * @throws InterruptedException 
	 */
	@Test
	public void test04() throws InterruptedException{
		for(int i=0;i<threadNum;i++){
			new Thread(new UserRequest()).start();
			cdl.countDown();//threadNum每次減1,到零時同時執行cdl.await();後邊程式碼
		}
		//主執行緒掛起,等子執行緒執行完以後
		Thread.currentThread().join();
	}
	
	private class UserRequest implements Runnable{
		@Override
		public void run() {
			//所有子執行緒在這裡等待,當所有執行緒例項化後,同時停止等待
			try {
				cdl.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			//N個子執行緒同時呼叫獲取類目
			List<Integer> leimu = leiMuService.queryCountByLeiMu();
			logger.info(Thread.currentThread().getName()+"==========================================>"+leimu.size());
		
		}
		
	}
	
}


缺點:1.非阻塞,短時間不能保證資料一致性

2.鎖失效時間難把握,一般為單執行緒處理時長的兩到三倍

3.可能出現鎖失效情況

4******不能在redis叢集環境中使用(叢集中可用redLock)

建議使用基於zookeeper的分散式鎖實現方式!!.