redis分散式鎖java實現解決快取雪崩
快取雪崩:因為快取失效(key生存時間到期)導致所有請求都去查詢資料庫,導致資料庫CPU和記憶體負載過高導致宕機。
快取雪崩原因及解決方案:
使用快取主要解決資料同步,並減少對資料庫訪問次數。因此,通常解決方案往往是使用互斥鎖,讓一個執行緒訪問資料庫,並將資料更新到快取中,其他執行緒訪問快取中資料。如果是基於jvm鎖機制的話,只能解決單機問題,也就是隻讓本機一個執行緒訪問快取,但是分散式條件下是不能使用的。所以,要基於快取的分散式鎖來實現。
以redis為例解釋下實現分散式鎖的原理:
獲取鎖:
所有執行緒操作一個共同的key比如:lock,如果redis中不存在key為lock的值,那麼當前執行緒獲取鎖併為lock設定一個隨機值。
如果lock已經存在了,說明已經有執行緒獲取鎖,該執行緒不能再獲取了。
釋放鎖:
獲取鎖的執行緒操作執行完畢後,清除lock的值,這樣鎖就釋放了。所以,對鎖的操作就是通過對同一個key值的新增和刪除操作。
程式碼:
@Service public class RedisLock implements Lock { @Autowired private JedisConnectionFactory factory; private static final String LOCK="lock"; private ThreadLocal<String> local=new ThreadLocal<String>(); //獲取鎖 @Override public boolean tryLock() { //獲取Jedis的原始資料連線 Jedis jedis = (Jedis)factory.getConnection().getNativeConnection(); String uuid = UUID.randomUUID().toString(); /** 獲取鎖:設定一個隨機值,超期時間1s String key, String value, String nxxx, String expx, int time) nxxx: NX:key不存在時設值 XX:key存在時設值 expx: EX|PX, expire time units: EX = seconds; PX = milliseconds */ String ret = jedis.set(LOCK, uuid, "NX", "PX", 1000); if(!StringUtils.isEmpty(ret)&&ret.equals("OK")){ local.set(uuid); return true; } return false; } /** * 解鎖 */ @Override public void unlock() { String script=null; try { script=FileCopyUtils.copyToString(new FileReader(ResourceUtils.getFile("classpath:cn/rjx/spring/cache/unlock.c"))); } catch (IOException e) { e.printStackTrace(); } Jedis jedis = (Jedis)factory.getConnection().getNativeConnection(); List<String> keys=new ArrayList<String>(); keys.add(LOCK); List<String> args=new ArrayList<String>(); args.add(local.get()); //如果redis中的 lock值和當前執行緒的uuid值相等,刪除Key值 jedis.eval(script, keys, args); } }
刪除鍵值是執行的指令碼unlock.c:
if redis.call("get",KEYS[1])==ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
操作快取的具體流程:
1.當執行緒查詢某一值時先檢視快取是否存在該值。
2.如果存在直接返回主快取中的資料。
3.1如果不存在,只有一個執行緒獲取鎖並去資料庫讀取資料,讀取後更新主快取和備份快取。
3.2 其他執行緒取備份快取中的資料。.
程式碼實現:初始時,主快取和備份快取為空,此時可能會有執行緒獲取的值為空,但是並不影響使用者體驗,使用者可以再重新整理一次。在要求比較高的場景裡面,可以考慮先把資料寫入快取中,可以搭配定時重新整理快取的機制。
public List<Integer> queryCountByLeiMu() {
List<Integer> cacheResult = cacheService.cacheResult("101", "leimu");
if(cacheResult!=null){
logger.info("================get cache=======================");
return cacheResult;
}
if(lock.tryLock()){
logger.info("================get db=======================");
List<Integer> list=empDao.queryCountByLeiMu();
cacheService.cachePut("101", list, "leimu");//主快取
cacheService.cachePut("beifen101", list, "beifenleimu");//備份快取
lock.unlock();
return list;
}else{
logger.info("================get BEIFEN=======================");
//備份中拿
return cacheService.cacheResult("beifen101", "beifenleimu");
}
}
資料同步問題:主快取中key的過期時間比較短,這樣保證儘可能獲取新資料。
bean.xml中快取失效時間設定:
<!-- 開啟快取註解掃描 -->
<cache:annotation-driven />
<bean id="cacheManager" class="org.springframework.data.redis.cache.RedisCacheManager">
<constructor-arg index="0" ref="redisTemplate"></constructor-arg>
<property name="expires">
<map>
<entry key="leimu" value="5"></entry>
<entry key="beifenleimu" value="100"></entry>
</map>
</property>
</bean>
測試方法模擬高併發情景:
@Autowired
LeiMuService leiMuService;
private static final int threadNum=13;
//倒計數器(發令槍) 用於製造執行緒併發執行
private static CountDownLatch cdl=new CountDownLatch(threadNum);
/**
* 模擬高併發條件下,資料庫查詢耗時比較長
* @throws InterruptedException
*/
@Test
public void test04() throws InterruptedException{
for(int i=0;i<threadNum;i++){
new Thread(new UserRequest()).start();
cdl.countDown();//threadNum每次減1,到零時同時執行cdl.await();後邊程式碼
}
//主執行緒掛起,等子執行緒執行完以後
Thread.currentThread().join();
}
private class UserRequest implements Runnable{
@Override
public void run() {
//所有子執行緒在這裡等待,當所有執行緒例項化後,同時停止等待
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
//N個子執行緒同時呼叫獲取類目
List<Integer> leimu = leiMuService.queryCountByLeiMu();
logger.info(Thread.currentThread().getName()+"==========================================>"+leimu.size());
}
}
}
缺點:1.非阻塞,短時間不能保證資料一致性
2.鎖失效時間難把握,一般為單執行緒處理時長的兩到三倍
3.可能出現鎖失效情況
4******不能在redis叢集環境中使用(叢集中可用redLock)
建議使用基於zookeeper的分散式鎖實現方式!!.