1. 程式人生 > >分布式鎖的實現方式及原理

分布式鎖的實現方式及原理

模擬 才有 border zook zed pla 不為 .info byte

轉載自http://www.jb51.net/article/118312.htm * 在集群等多服務器中經常使用到同步處理一下業務,這是普通的事務是滿足不了業務需求,需要分布式鎖 * * 分布式鎖的常用3種實現: * 0.數據庫樂觀鎖實現 * 1.Redis實現 --- 使用redis的setnx()、get()、getset()方法,用於分布式鎖,解決死鎖問題 * 2、zookeeper實現 Zookeeper實現 * 參考:http://surlymo.iteye.com/blog/2082684 * http://www.jb51.net/article/103617.htm
* http://www.hollischuang.com/archives/1716?utm_source=tuicool&utm_medium=referral 1、實現原理: 基於zookeeper瞬時有序節點實現的分布式鎖,其主要邏輯如下(該圖來自於IBM網站)。大致思想即為:每個客戶端對某個功能加鎖時,在zookeeper上的與該功能對應的指定節點的目錄下,生成一個唯一的瞬時有序節點。判斷是否獲取鎖的方式很簡單,只需要判斷有序節點中序號最小的一個。當釋放鎖的時候,只需將這個瞬時節點刪除即可。同時,其可以避免服務宕機導致的鎖無法釋放,而產生的死鎖問題。 2、優點
鎖安全性高,zk可持久化 3、缺點 性能開銷比較高。因為其需要動態產生、銷毀瞬時節點來實現鎖功能。 4、實現 可以直接采用zookeeper第三方庫curator即可方便地實現分布式鎖 Redis實現分布式鎖的原理: * 1.通過setnx(lock_timeout)實現,如果設置了鎖返回1, 已經有值沒有設置成功返回0 * 2.死鎖問題:通過實踐來判斷是否過期,如果已經過期,獲取到過期時間get(lockKey),然後getset(lock_timeout)判斷是否和get相同, * 相同則證明已經加鎖成功,因為可能導致多線程同時執行getset(lock_timeout)方法,這可能導致多線程都只需getset後,對於判斷加鎖成功的線程,
* 再加expire(lockKey, LOCK_TIMEOUT, TimeUnit.MILLISECONDS)過期時間,防止多個線程同時疊加時間,導致鎖時效時間翻倍 * 3.針對集群服務器時間不一致問題,可以調用redis的time()獲取當前時間

2.Redis分分布式鎖的代碼實現

1.定義鎖接口

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package com.jay.service.redis; /** * Redis分布式鎖接口 * Created by hetiewei on 2017/4/7. */ public interface RedisDistributionLock { /** * 加鎖成功,返回加鎖時間 * @param lockKey * @param threadName * @return */ public long lock(String lockKey, String threadName); /** * 解鎖, 需要更新加鎖時間,判斷是否有權限 * @param lockKey * @param lockValue * @param threadName */ public void unlock(String lockKey, long lockValue, String threadName); /** * 多服務器集群,使用下面的方法,代替System.currentTimeMillis(),獲取redis時間,避免多服務的時間不一致問題!!! * @return */ public long currtTimeForRedis(); }

2.定義鎖實現

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 package com.jay.service.redis.impl; import com.jay.service.redis.RedisDistributionLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.serializer.RedisSerializer; import java.util.concurrent.TimeUnit; /** * Created by hetiewei on 2017/4/7. */ public class RedisLockImpl implements RedisDistributionLock { //加鎖超時時間,單位毫秒, 即:加鎖時間內執行完操作,如果未完成會有並發現象 private static final long LOCK_TIMEOUT = 5*1000; private static final Logger LOG = LoggerFactory.getLogger(RedisLockImpl.class); private StringRedisTemplate redisTemplate; public RedisLockImpl(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } /** * 加鎖 * 取到鎖加鎖,取不到鎖一直等待知道獲得鎖 * @param lockKey * @param threadName * @return */ @Override public synchronized long lock(String lockKey, String threadName) { LOG.info(threadName+"開始執行加鎖"); while (true){ //循環獲取鎖 //鎖時間 Long lock_timeout = currtTimeForRedis()+ LOCK_TIMEOUT +1; if (redisTemplate.execute(new RedisCallback<Boolean>() { @Override public Boolean doInRedis(RedisConnection redisConnection) throws DataAccessException { //定義序列化方式 RedisSerializer<String> serializer = redisTemplate.getStringSerializer(); byte[] value = serializer.serialize(lock_timeout.toString()); boolean flag = redisConnection.setNX(lockKey.getBytes(), value); return flag; } })){ //如果加鎖成功 LOG.info(threadName +"加鎖成功 ++++ 111111"); //設置超時時間,釋放內存 redisTemplate.expire(lockKey, LOCK_TIMEOUT, TimeUnit.MILLISECONDS); return lock_timeout; }else { //獲取redis裏面的時間 String result = redisTemplate.opsForValue().get(lockKey); Long currt_lock_timeout_str = result==null?null:Long.parseLong(result); //鎖已經失效 if (currt_lock_timeout_str != null && currt_lock_timeout_str < System.currentTimeMillis()){ //判斷是否為空,不為空時,說明已經失效,如果被其他線程設置了值,則第二個條件判斷無法執行 //獲取上一個鎖到期時間,並設置現在的鎖到期時間 Long old_lock_timeout_Str = Long.valueOf(redisTemplate.opsForValue().getAndSet(lockKey, lock_timeout.toString())); if (old_lock_timeout_Str != null && old_lock_timeout_Str.equals(currt_lock_timeout_str)){ //多線程運行時,多個線程簽好都到了這裏,但只有一個線程的設置值和當前值相同,它才有權利獲取鎖 LOG.info(threadName + "加鎖成功 ++++ 22222"); //設置超時間,釋放內存 redisTemplate.expire(lockKey, LOCK_TIMEOUT, TimeUnit.MILLISECONDS); //返回加鎖時間 return lock_timeout; } } } try { LOG.info(threadName +"等待加鎖, 睡眠100毫秒"); // TimeUnit.MILLISECONDS.sleep(100); TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 解鎖 * @param lockKey * @param lockValue * @param threadName */ @Override public synchronized void unlock(String lockKey, long lockValue, String threadName) { LOG.info(threadName + "執行解鎖==========");//正常直接刪除 如果異常關閉判斷加鎖會判斷過期時間 //獲取redis中設置的時間 String result = redisTemplate.opsForValue().get(lockKey); Long currt_lock_timeout_str = result ==null?null:Long.valueOf(result); //如果是加鎖者,則刪除鎖, 如果不是,則等待自動過期,重新競爭加鎖 if (currt_lock_timeout_str !=null && currt_lock_timeout_str == lockValue){ redisTemplate.delete(lockKey); LOG.info(threadName + "解鎖成功------------------"); } } /** * 多服務器集群,使用下面的方法,代替System.currentTimeMillis(),獲取redis時間,避免多服務的時間不一致問題!!! * @return */ @Override public long currtTimeForRedis(){ return redisTemplate.execute(new RedisCallback<Long>() { @Override public Long doInRedis(RedisConnection redisConnection) throws DataAccessException { return redisConnection.time(); } }); } }

3.分布式鎖驗證

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 @RestController @RequestMapping("/distribution/redis") public class RedisLockController { private static final String LOCK_NO = "redis_distribution_lock_no_"; private static int i = 0; private ExecutorService service; @Autowired private StringRedisTemplate redisTemplate; /** * 模擬1000個線程同時執行業務,修改資源 * * 使用線程池定義了20個線程 * */ @GetMapping("lock1") public void testRedisDistributionLock1(){ service = Executors.newFixedThreadPool(20); for (int i=0;i<1000;i++){ service.execute(new Runnable() { @Override public void run() { task(Thread.currentThread().getName()); } }); } } @GetMapping("/{key}") public String getValue(@PathVariable("key") String key){ Serializable result = redisTemplate.opsForValue().get(key); return result.toString(); } private void task(String name) { // System.out.println(name + "任務執行中"+(i++)); //創建一個redis分布式鎖 RedisLockImpl redisLock = new RedisLockImpl(redisTemplate); //加鎖時間 Long lockTime; if ((lockTime = redisLock.lock((LOCK_NO+1)+"", name))!=null){ //開始執行任務 System.out.println(name + "任務執行中"+(i++)); //任務執行完畢 關閉鎖 redisLock.unlock((LOCK_NO+1)+"", lockTime, name); } } }

分布式鎖的實現方式及原理