1. 程式人生 > >分散式鎖(二)__基於資料庫實現

分散式鎖(二)__基於資料庫實現

原理介紹:

要基於資料庫實現分散式鎖,最簡單的方式可能就是直接建立一張鎖表,然後通過操作該表中的資料來實現。

當需要鎖住某個方法或資源時,就在該表中增加一條記錄,想要釋放鎖的時候就刪除這條記錄

問題:

1.這把鎖依賴資料庫的可用性,資料庫是一個單點,一旦資料庫掛掉,會導致業務系統不可用

2.這把鎖沒有失效時間,一旦解鎖操作失敗,會導致鎖記錄一直在資料庫中,其他執行緒無法再獲得鎖

3.這把鎖只能是非阻塞的,因為資料的insert操作,一旦插入失敗就會直接報錯,沒有獲得鎖的執行緒並不會進入排隊佇列,想要再次獲得鎖就要再次觸發獲得鎖操作。

4,這把鎖是非重入的,同一個執行緒沒有釋放鎖之前無法在獲得該鎖,因為資料已經存在了。

 

解決:

1.資料庫是單點?搞兩個資料庫,資料之前雙向同步,一旦掛掉迅速切換到備庫上。

2.沒有失效時間?做一個定時任務,每隔一定時間把資料庫中的超時資料清理一遍。

3.非阻塞?搞一個while迴圈,直到insert成功再返回

4.非重入的?在資料庫中加個欄位,記錄當前獲得鎖的機器的主機資訊和執行緒資訊,那麼下次再獲取的時候先查詢資料庫,如果當前機器的主機資訊和執行緒資訊在資料庫可以查詢的話,直接把鎖分配給他就可以了。

基於資料庫排他鎖:

除了可以通過增刪操作資料庫中的記錄以外,其實還可以藉助資料中自帶的鎖來實現分散式的鎖。

在查詢語句後面增加for update語句,資料庫會在查詢過程中給資料表增加排他鎖(innodb引擎在加鎖的時候,只有通過索引進行檢索的時候才會使用行級鎖,否則會使用表級鎖)

使用資料庫來實現分散式鎖的方式,這兩種方式都是依賴資料庫的一張表,一種是通過表中的記錄的存在情況確定當前是否有鎖存在,另外一種是通過資料庫的排他鎖來實現分散式鎖。

引入依賴:

<dependency>
	<groupId>org.mybatis</groupId>
	<artifactId>mybatis</artifactId>
	<version>3.4.6</version>
</dependency>

<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>8.0.11</version>
</dependency>

建立表user:

CREATE TABLE `user` (
  `user_id` varchar(255) NOT NULL,
  `user_name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

MybatisConfig.xml:

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
		PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
		"http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>

	<settings>
		<!-- 這個地方很重要 設定sql的執行時間 超過時間 沒有完成 就會丟擲異常 這裡設定一秒 -->
		<setting name="defaultStatementTimeout" value="1" />
	</settings>

	<environments default="mysql">
		<environment id="mysql">
			<!-- 配置事務 -->
			<transactionManager type="JDBC"></transactionManager>
			<!-- 配置資料來源 -->
			<dataSource type="POOLED">
				<property name="driver" value="com.mysql.cj.jdbc.Driver" />
				<property name="url" value="jdbc:mysql://localhost:3306/clouddb01?useSSL=true" />
				<property name="username" value="root" />
				<property name="password" value="123456" />

				<property name="poolMaximumIdleConnections" value="50" />
				<property name="poolMaximumActiveConnections" value="1000" />
				<property name="poolPingQuery" value="SELECT 1 FROM DUAL" />
				<property name="poolPingEnabled" value="true" />
			</dataSource>
		</environment>
	</environments>

	<mappers>
		<mapper resource="com/th/mapper/userMapper.xml" />
	</mappers>

</configuration>

userMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>  
<!DOCTYPE mapper  PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"  "http://mybatis.org/dtd/mybatis-3-mapper.dtd">

<!-- namespace:名稱空間,是某一個 dao 層的具體路徑 -->
<mapper namespace="com.th.dao.user">
	<resultMap type="com.th.entity.User" id="User">
		<result property="userId" column="user_id" />
		<result property="userName" column="user_name" />
	</resultMap>

	<select id="getUserAllEentity" resultType="com.th.entity.User" resultMap="User">
		SELECT * FROM user
	</select>
	
	<update id="update" parameterType="com.th.entity.User">
		UPDATE user SET user_name = #{userName} WHERE user_id=#{userId}
	</update>

</mapper>

user:

package com.th.entity;

public class User {

	private String userId;
	private String userName;

	public User() {
		super();
	}

	public User(String userId, String userName) {
		super();
		this.userId = userId;
		this.userName = userName;
	}

	public String getUserId() {
		return userId;
	}

	public void setUserId(String userId) {
		this.userId = userId;
	}

	public String getUserName() {
		return userName;
	}

	public void setUserName(String userName) {
		this.userName = userName;
	}

}

和一 zk分散式鎖一樣的OrderService介面:

public interface OrderService {

	void createOrder();
}

模式公共資源的OrderCodeGenerator:

package com.th.order;

import java.text.SimpleDateFormat;
import java.util.Date;

public class OrderCodeGenerator {

	private static int i = 0;

	public String getOrderCode() {
		Date now = new Date();
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-");
		return sdf.format(now) + ++i;
	}

}

基於資料庫實現的lock:

package com.th.order;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.apache.ibatis.io.Resources;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;

import com.th.entity.User;

public class DbLock implements Lock {
	private SqlSession session;
	private User user;

	DbLock(User user) {
		InputStream inputStream = null;
		try {
			inputStream = Resources.getResourceAsStream("MybatisConfig.xml");
		} catch (IOException e) {
			e.printStackTrace();
		}
		SqlSessionFactoryBuilder sessionFactoryBuilder = new SqlSessionFactoryBuilder();
		SqlSessionFactory sessionFactory = sessionFactoryBuilder.build(inputStream);
		this.session = sessionFactory.openSession();
		this.user = user;
	}

	@Override
	public void lock() {
		tryLock();
	}

	@Override
	public boolean tryLock() {
		int count = session.update("com.th.dao.user.update", user);
		return count == 1 ? true : false;
	}

	@Override
	public void unlock() {
		session.commit();
	}

	@Override
	public void lockInterruptibly() throws InterruptedException {
		// TODO Auto-generated method stub

	}

	@Override
	public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
		return false;
	}

	@Override
	public Condition newCondition() {
		return null;
	}

}

OrderServiceImplWithZkDis 及測試方法main:

package com.th.order;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.locks.Lock;

import com.th.entity.User;

public class OrderServiceImplWithZkDis implements OrderService {

	private static OrderCodeGenerator org = new OrderCodeGenerator();

	// private Lock lock = new ZookeeperDisLock("/LOCK_TEST");

	// private Lock lock = new ZookeeperReAbleDisLock("/LOCK_TEST");
	
	private Lock lock = new DbLock(new User("1","張三丰"));

	@Override
	public void createOrder() {
		String orderCode = null;

		try {
			lock.lock();

			orderCode = org.getOrderCode();
			//TestReLock();

			System.out.println(Thread.currentThread().getName() + "生成訂單:" + orderCode);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}

	}

	public void TestReLock() {
		lock.lock();
		System.out.println(Thread.currentThread().getName() + "測試重入鎖成功...");
		lock.unlock();

	}

	public static void main(String[] args) {
		int num = 20;
		CyclicBarrier cyclicBarrier = new CyclicBarrier(num);

		for (int i = 0; i < num; i++) {
			new Thread(new Runnable() {

				@Override
				public void run() {
					OrderService orderService = new OrderServiceImplWithZkDis();
					System.out.println(Thread.currentThread().getName() + ": 我準備好了");

					try {
						cyclicBarrier.await();
					} catch (Exception e) {
						e.printStackTrace();
					}

					orderService.createOrder();
				}
			}).start();
		}

	}

}

測試結果: