分散式鎖(二)__基於資料庫實現
阿新 • • 發佈:2018-11-10
原理介紹:
要基於資料庫實現分散式鎖,最簡單的方式可能就是直接建立一張鎖表,然後通過操作該表中的資料來實現。
當需要鎖住某個方法或資源時,就在該表中增加一條記錄,想要釋放鎖的時候就刪除這條記錄
問題:
1.這把鎖依賴資料庫的可用性,資料庫是一個單點,一旦資料庫掛掉,會導致業務系統不可用
2.這把鎖沒有失效時間,一旦解鎖操作失敗,會導致鎖記錄一直在資料庫中,其他執行緒無法再獲得鎖
3.這把鎖只能是非阻塞的,因為資料的insert操作,一旦插入失敗就會直接報錯,沒有獲得鎖的執行緒並不會進入排隊佇列,想要再次獲得鎖就要再次觸發獲得鎖操作。
4,這把鎖是非重入的,同一個執行緒沒有釋放鎖之前無法在獲得該鎖,因為資料已經存在了。
解決:
1.資料庫是單點?搞兩個資料庫,資料之前雙向同步,一旦掛掉迅速切換到備庫上。
2.沒有失效時間?做一個定時任務,每隔一定時間把資料庫中的超時資料清理一遍。
3.非阻塞?搞一個while迴圈,直到insert成功再返回
4.非重入的?在資料庫中加個欄位,記錄當前獲得鎖的機器的主機資訊和執行緒資訊,那麼下次再獲取的時候先查詢資料庫,如果當前機器的主機資訊和執行緒資訊在資料庫可以查詢的話,直接把鎖分配給他就可以了。
基於資料庫排他鎖:
除了可以通過增刪操作資料庫中的記錄以外,其實還可以藉助資料中自帶的鎖來實現分散式的鎖。
在查詢語句後面增加for update語句,資料庫會在查詢過程中給資料表增加排他鎖(innodb引擎在加鎖的時候,只有通過索引進行檢索的時候才會使用行級鎖,否則會使用表級鎖)
使用資料庫來實現分散式鎖的方式,這兩種方式都是依賴資料庫的一張表,一種是通過表中的記錄的存在情況確定當前是否有鎖存在,另外一種是通過資料庫的排他鎖來實現分散式鎖。
引入依賴:
<dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis</artifactId> <version>3.4.6</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.11</version> </dependency>
建立表user:
CREATE TABLE `user` (
`user_id` varchar(255) NOT NULL,
`user_name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
MybatisConfig.xml:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<settings>
<!-- 這個地方很重要 設定sql的執行時間 超過時間 沒有完成 就會丟擲異常 這裡設定一秒 -->
<setting name="defaultStatementTimeout" value="1" />
</settings>
<environments default="mysql">
<environment id="mysql">
<!-- 配置事務 -->
<transactionManager type="JDBC"></transactionManager>
<!-- 配置資料來源 -->
<dataSource type="POOLED">
<property name="driver" value="com.mysql.cj.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost:3306/clouddb01?useSSL=true" />
<property name="username" value="root" />
<property name="password" value="123456" />
<property name="poolMaximumIdleConnections" value="50" />
<property name="poolMaximumActiveConnections" value="1000" />
<property name="poolPingQuery" value="SELECT 1 FROM DUAL" />
<property name="poolPingEnabled" value="true" />
</dataSource>
</environment>
</environments>
<mappers>
<mapper resource="com/th/mapper/userMapper.xml" />
</mappers>
</configuration>
userMapper.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<!-- namespace:名稱空間,是某一個 dao 層的具體路徑 -->
<mapper namespace="com.th.dao.user">
<resultMap type="com.th.entity.User" id="User">
<result property="userId" column="user_id" />
<result property="userName" column="user_name" />
</resultMap>
<select id="getUserAllEentity" resultType="com.th.entity.User" resultMap="User">
SELECT * FROM user
</select>
<update id="update" parameterType="com.th.entity.User">
UPDATE user SET user_name = #{userName} WHERE user_id=#{userId}
</update>
</mapper>
user:
package com.th.entity;
public class User {
private String userId;
private String userName;
public User() {
super();
}
public User(String userId, String userName) {
super();
this.userId = userId;
this.userName = userName;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
}
和一 zk分散式鎖一樣的OrderService介面:
public interface OrderService {
void createOrder();
}
模式公共資源的OrderCodeGenerator:
package com.th.order;
import java.text.SimpleDateFormat;
import java.util.Date;
public class OrderCodeGenerator {
private static int i = 0;
public String getOrderCode() {
Date now = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-");
return sdf.format(now) + ++i;
}
}
基於資料庫實現的lock:
package com.th.order;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.apache.ibatis.io.Resources;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;
import com.th.entity.User;
public class DbLock implements Lock {
private SqlSession session;
private User user;
DbLock(User user) {
InputStream inputStream = null;
try {
inputStream = Resources.getResourceAsStream("MybatisConfig.xml");
} catch (IOException e) {
e.printStackTrace();
}
SqlSessionFactoryBuilder sessionFactoryBuilder = new SqlSessionFactoryBuilder();
SqlSessionFactory sessionFactory = sessionFactoryBuilder.build(inputStream);
this.session = sessionFactory.openSession();
this.user = user;
}
@Override
public void lock() {
tryLock();
}
@Override
public boolean tryLock() {
int count = session.update("com.th.dao.user.update", user);
return count == 1 ? true : false;
}
@Override
public void unlock() {
session.commit();
}
@Override
public void lockInterruptibly() throws InterruptedException {
// TODO Auto-generated method stub
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}
OrderServiceImplWithZkDis 及測試方法main:
package com.th.order;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.locks.Lock;
import com.th.entity.User;
public class OrderServiceImplWithZkDis implements OrderService {
private static OrderCodeGenerator org = new OrderCodeGenerator();
// private Lock lock = new ZookeeperDisLock("/LOCK_TEST");
// private Lock lock = new ZookeeperReAbleDisLock("/LOCK_TEST");
private Lock lock = new DbLock(new User("1","張三丰"));
@Override
public void createOrder() {
String orderCode = null;
try {
lock.lock();
orderCode = org.getOrderCode();
//TestReLock();
System.out.println(Thread.currentThread().getName() + "生成訂單:" + orderCode);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void TestReLock() {
lock.lock();
System.out.println(Thread.currentThread().getName() + "測試重入鎖成功...");
lock.unlock();
}
public static void main(String[] args) {
int num = 20;
CyclicBarrier cyclicBarrier = new CyclicBarrier(num);
for (int i = 0; i < num; i++) {
new Thread(new Runnable() {
@Override
public void run() {
OrderService orderService = new OrderServiceImplWithZkDis();
System.out.println(Thread.currentThread().getName() + ": 我準備好了");
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
orderService.createOrder();
}
}).start();
}
}
}