1. 程式人生 > >redis與zk實現分散式鎖

redis與zk實現分散式鎖

  1. 概述

分散式鎖,如果你有多個機器在訪問同一個共享資源,

那麼這個時候,如果你需要加個鎖,讓多個分散式的機器在訪問共享資源的時候序列起來

那麼這個時候,那個鎖,多個不同機器上的服務共享的鎖,就是分散式鎖

分散式鎖當然有很多種不同的實現方案,redis分散式鎖,zookeeper分散式鎖

  1. 對比

資料庫鎖:

優點:直接使用資料庫,使用簡單。

缺點:分散式系統大多數瓶頸都在資料庫,使用資料庫鎖會增加資料庫負擔。

快取鎖:

優點:效能高,實現起來較為方便,在允許偶發的鎖失效情況,不影響系統正常使用,建議採用快取鎖。

缺點:通過鎖超時機制不是十分可靠,當執行緒獲得鎖後,處理時間過長導致鎖超時,就失效了鎖的作用。

zookeeper鎖:

優點:不依靠超時時間釋放鎖;可靠性高;系統要求高可靠性時,建議採用zookeeper鎖。

缺點:效能比不上快取鎖,因為要頻繁的建立節點刪除節點。
  1. zk實現

     /**
      * ZkSession
      * 
      * @author kris
      * 
      */
     public class ZkSession {
    
     private static Logger log = LoggerFactory.getLogger(ZkSession.class);
    
     private static RetryPolicy retryPolicy;
     
     private InterProcessMutex mutex ;
     private CuratorFramework client ;
    
     public ZkSession() {
     	try {
     		//初試時間為1s 重試3次
     		retryPolicy = new ExponentialBackoffRetry(1000, 3);
     		//"192.168.0.132:2181,192.168.0.132:2182,192.168.0.132:2183"
     		client = CuratorFrameworkFactory.newClient(XxlConfClient.get("cache.host", ""), retryPolicy);
     		//建立zookeeper的客戶端
     		client.start();
     	} catch (Exception e) {
     		e.printStackTrace();
     	}
     }
    
     /**
      * 獲取分散式鎖
      * 
      * @param path
      *            /curator/lock
      */
     public void acquireLock(String path) {
     	boolean flag = false;
     	try {
     		mutex = new InterProcessMutex(client, path);
     		int count = 0;
     		
     		while (true) {
     			try {
     				// 嘗試獲取鎖,最多等待5秒
     				flag = mutex.acquire(2, TimeUnit.SECONDS);
     				Thread currentThread = Thread.currentThread();
     				if (flag) {
     					log.info("執行緒" + currentThread.getId() + "獲取鎖成功!  path:"+path);
     				} else {
     					log.info("執行緒" + currentThread.getId() + "獲取鎖失敗");
     					count++;
     					log.info("the " + count + " times try to acquire lock for " + path + "......");
     					continue;
     				}
     			} catch (Exception e2) {
     				e2.printStackTrace();
     			}
     			break;
     		}
    
     	} catch (Exception e) {
     		e.printStackTrace();
     	} 
     }
     
     /**
      * 釋放鎖
      */
     public void releaseLock(){
     	try {
     		if (null != mutex) {
     			mutex.release();
     		}
    
     		if (null != client) {
     			client.close();
     		}
     	} catch (Exception e) {
     		e.printStackTrace();
     	}
     }
    

    }

3.1 使用

//建立臨時節點路徑
String path = "/curator/lock";

ZkSession lock = new ZkSession().acquireLock(lock);

//...執行業務需求
Thread.sleep(3000);

lock.releaseLock();
  1. redis實現

我這裡是用codis的jodis實現的

public class JodisUtils {

	static {
		JedisPoolConfig config = null;
		try {
			config = initConfig();
			if (!closeFlag) {
				LOG.info("啟動引數 host: " + XxlConfClient.get("cache.host", "") + " path: " + XxlConfClient.get("cache.path", ""));
				pool = RoundRobinJedisPool.create().poolConfig(config).curatorClient(XxlConfClient.get("cache.host", ""), 30000)
						.zkProxyDir(XxlConfClient.get("cache.path", "")).build();
			}
		} catch (Exception e) {
			LOG.info("初始化錯誤,關掉快取" + e.getMessage(), e);
		}
	}
	private static JodisUtils ins;

	public static JodisUtils getInstance() {
		if (ins == null) {
			ins = new JodisUtils();
		}
		return ins;
	}

	/**
	 * 原子獲取鎖
	 * @param key
	 * @param value
	 * @param expireMillis
	 * @return
	 */
	public boolean acquireLock(String key, String value, int expireMillis) {
		try {
			Jedis jedis = pool.getResource();
			try {
				// nx = not exist, px= 單位是毫秒
				boolean flag = false;
				String result = jedis.set(key, value, "NX", "PX", expireMillis);
				if (result != null && result.equalsIgnoreCase("OK")) {
					flag = true;
				}
				return flag;
			} catch (Exception e) {
				LOG.error(e.getMessage(), e);
				return false;
			} finally {
				jedis.close();
			}
		} catch (Exception e) {
			LOG.error(e.getMessage(), e);
			return false;
		}
	}

	/**
	 * 執行lua指令碼釋放鎖(刪除)
	 * @param key
	 * @param value
	 * @return
	 */
	public boolean releaseLock(String key, String value) {
		try {
			Jedis jedis = pool.getResource();
			try {
				boolean flag = false;
				String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] " + "then " + " return redis.call(\"del\",KEYS[1]) " + "else " + " return 0 " + "end ";
				Object result = jedis.eval(script, Collections.singletonList(key), Collections.singletonList(value));
				if (Objects.equals(UNLOCK_SUCCESS, result)) {
					flag = true;
				}
				return flag;
			} catch (Exception e) {
				e.printStackTrace();
				return false;
			} finally {
				//jedis.unwatch();
				jedis.close();
			}
		} catch (Exception e) {
			LOG.error(e.getMessage(), e);
			return false;
		}
	}

}

4.1 使用

String key = "testLock";
String uuid = System.currentTimeMillis()+UUID.randomUUID().toString();
	
int count = 0;
	
while (true) {
	try {
		boolean acquireLock = JodisUtils.getInstance().acquireLock(key, uuid, 30000);
		Thread currentThread = Thread.currentThread();
			
			
		if (acquireLock) {
			LOG.info("執行緒" + currentThread.getId() + "獲取 鎖 成功!  key:"+key);
		} else {
			LOG.info("執行緒" + currentThread.getId() + "獲取 鎖 失敗");
			
			count++;
			Thread.sleep(100);
			LOG.info("the " + count + " times try to acquire lock for " + key + "......");
			continue;
		}
		
		//執行業務需求
		Thread.sleep(500);
		boolean releaseLock = JodisUtils.getInstance().releaseLock(key, uuid);
			
		if (releaseLock) {
			LOG.info("執行緒" + currentThread.getId() + "釋放 鎖 成功!  key:"+key);
			break;
		}
	} catch (Exception e2) {
		e2.printStackTrace();
	}
}