1. 程式人生 > >Netty遊戲伺服器實戰開發(8):利用redis或者zookeeper實現3pc分散式事務鎖(二)。支撐騰訊系列某手遊百萬級流量公測

Netty遊戲伺服器實戰開發(8):利用redis或者zookeeper實現3pc分散式事務鎖(二)。支撐騰訊系列某手遊百萬級流量公測

導讀:在上篇文章中介紹了分散式事務專案的基本原理和工程元件,我們瞭解到了分散式事務的理論知識。處於實戰的經驗,我們將理論知識使用到實際專案中。所以我們將藉助idea中maven工程 來實戰我們的專案。

回到正文:

在上篇文章中我們已經把需要的準備工作做好了。現在我們需要將如何實現分散式3PC事務提交鎖。

先睹為快

首先我們先來體驗一下事務提交鎖的過程,在本專案中我們將在Windows環境下搭建redis環境和zookeeper環境。下面就是我們只需一段分散式加鎖程式的過程。一段執行鎖發生異常的行為:

事務鎖執行過程

定義事物鎖的型別: 我們使用分散式事務鎖的時候我們需要提供如下幾種型別的鎖:

  • 1:寫鎖,WRITE
  • 2:讀鎖,READ
  • 3:獨山寫鎖,到時間釋放:WRITE_TIME
  • 4:強制時間鎖,無論獲取鎖成功,強制時間鎖, 到時間時間釋放,FORCE_WRITE_TIME

更具上面分析我們將定義一個列舉類來列舉上面所需要的鎖。NettyTransactionLockType.java

package com.twjitm.transaction.transaction.enums;

/**
 * 事物鎖型別
 *
 * @author twjitm- [Created on 2018-08-27 11:50]
 * @jdk java version "1.8.0_77"
 */
public enum  NettyTransactionLockType {
    /**獨自佔有鎖 */
    WRITE,
    /**讀鎖*/
    READ,
    /** 獨自寫佔有,到時間才會釋放鎖*/
    WRITE_TIME,
    /** 獨自寫佔有,無論獲取鎖成功,強制時間鎖, 到時間時間釋放*/
    FORCE_WRITE_TIME,
    ;
}

有了鎖的列舉,那麼我們將來實現事務實體的行為,試想一下。一個事務的執行過程。由如下行為:

  • 1:是否能夠建立鎖
  • 2:是否能夠嘗試提交鎖
  • 3:是否能夠正式提交鎖
  • 4:提交過程是否發生異常
  • 5:發生異常按照不同粒度回滾鎖
  • 6:回滾鎖是否發生異常
  • 7:是否釋放鎖
  • 8:釋放鎖是否發生異常
  • 9:補償提交鎖

特別說明:事務、事務實體、和事務鎖不是一個概念。不要混淆了。

根據上面說的我們來定義這些事務實體行為介面。來細化事務的每個階段。NettyTransactionEntityInterface.java 介面

package com.twjitm.transaction.transaction.entity;

import com.twjitm.transaction.lock.NettyTransactionLockInterface;
import com.twjitm.transaction.transaction.enums.NettyTransactionCommitResult;
import com.twjitm.transaction.transaction.exception.NettyTransactionException;

/**
 * @author twjitm- [Created on 2018-08-27 10:16]
 * @jdk java version "1.8.0_77"
 */
public interface NettyTransactionEntityInterface {

    /**
     * 事物提交後需要執行的邏輯,提交這個事物主要
     * 的業務邏輯
     *
     * @throws NettyTransactionException
     */
    void commit() throws NettyTransactionException;


    /**
     * 回滾,事務在執行過程中發生異常後需要回滾操作的邏輯
     *
     * @throws NettyTransactionException
     */
    void rollback() throws NettyTransactionException;

    /**
     * 嘗試性提交
     */
    NettyTransactionCommitResult tryCommit() throws NettyTransactionException;


    /**
     * 是否可以建立鎖
     *
     * @return
     */
    boolean createNettyTransactionLock(long seconds) throws NettyTransactionException;

    /**
     * 釋放鎖
     *
     * @return
     */
    void releaseNettyTransactionLock();

    /**
     * 強制釋放鎖
     *
     * @return
     */
    void forceReleaseNettyTransactionLock();

    String getInfo();

    /**
     * 是否需要執行
     *
     * @return
     */
    boolean needCommit();

    /**
     * 獲取鎖內容
     *
     * @return
     */
    NettyTransactionLockInterface getNettyTransactionLockInterface();
}

一個事務實體需要由事務的操作。所以我們定義個事務實體來實現事務的行為操作。所以需要實現事務行為介面,由於系統中既對redis實現方式的支援也對zookeeper實現的方式支援。所以我們需要定義兩種型別的事務實體。首先我們來描述一下redis實現的事務實體。

AbstractNettyTransactionEntity.java

package com.twjitm.transaction.transaction.entity;

import com.twjitm.transaction.lock.NettyTransactionLock;
import com.twjitm.transaction.lock.NettyTransactionLockInterface;
import com.twjitm.transaction.lock.NettyTransactionReadLock;
import com.twjitm.transaction.service.redis.NettyTransactionRedisService;
import com.twjitm.transaction.transaction.enums.NettyTransactionEntityCause;
import com.twjitm.transaction.transaction.enums.NettyTransactionLockType;
import com.twjitm.transaction.transaction.exception.NettyTransactionException;

import java.util.BitSet;

/**
 * 基於redis實現分散式事物鎖
 * 抽象事物實體類,所有的事物鎖實體必須繼承
 * 本抽象類,實現自己的commit後的方法和rollback方法等使用者實現方法
 * <p>
 * 這些方法不是建立鎖或者回滾鎖方法,而是實現的是業務方法。需要注意的是
 * 不能再次方法中建立和鎖欄位一樣的欄位,否者造成資料一致性錯誤
 *
 * @author twjitm- [Created on 2018-08-27 11:48]
 * @jdk java version "1.8.0_77"
 */
public abstract class AbstractNettyTransactionEntity implements NettyTransactionEntityInterface {
    /**
     * 進度設定集合 主要用於rollback
     */
    private BitSet progressBitSet;

    /**
     * 事務鎖
     */
    private NettyTransactionLockInterface nettyTransactionLock;

    /**
     * 鎖型別
     */
    private NettyTransactionLockType nettyTransactionLockType;

    /**
     * 鎖的正向標誌(主要用於讀取的時候)
     */
    private boolean rejectFlag = false;

    public AbstractNettyTransactionEntity(NettyTransactionEntityCause cause,
                                          String key,
                                          NettyTransactionRedisService redisService) {
        this.progressBitSet = new BitSet();
        this.nettyTransactionLock = new NettyTransactionLock(key, redisService, cause);
        this.nettyTransactionLockType = NettyTransactionLockType.WRITE;
    }

    /**
     * 抽象分散式事物實體
     *
     * @param cause                    事物產生原因
     * @param key                      事物key
     * @param redisService             事物支援的redis服務
     * @param nettyTransactionLockType 事物鎖型別
     */
    public AbstractNettyTransactionEntity(NettyTransactionEntityCause cause,
                                          String key,
                                          NettyTransactionRedisService redisService,
                                          NettyTransactionLockType nettyTransactionLockType) {
        this.progressBitSet = new BitSet();
        if (nettyTransactionLockType.equals(NettyTransactionLockType.READ)) {
            this.nettyTransactionLock = new NettyTransactionReadLock(key, redisService, cause);
            this.nettyTransactionLockType = NettyTransactionLockType.READ;
        } else {
            this.nettyTransactionLock = new NettyTransactionLock(key, redisService, cause);
            this.nettyTransactionLockType = NettyTransactionLockType.WRITE;
        }
    }

    public AbstractNettyTransactionEntity(NettyTransactionEntityCause cause,
                                          String key,
                                          NettyTransactionRedisService redisService,
                                          NettyTransactionLockType nettyTransactionLockType, int lockTime) {
        this.progressBitSet = new BitSet();
        if (nettyTransactionLockType.equals(NettyTransactionLockType.READ)) {
            this.nettyTransactionLock = new NettyTransactionReadLock(key, redisService, cause);
            this.nettyTransactionLockType = NettyTransactionLockType.READ;
        }
        //獨佔鎖
        else if (nettyTransactionLockType.equals(NettyTransactionLockType.FORCE_WRITE_TIME)) {
            this.nettyTransactionLock = new NettyTransactionLock(key, redisService, cause, lockTime, true);
        } else {
            //非獨佔鎖
            this.nettyTransactionLock = new NettyTransactionLock(key, redisService, cause, lockTime, false);
        }
        this.nettyTransactionLockType = nettyTransactionLockType;
    }


    /**
     * 是否能夠建立事物鎖
     *
     * @param seconds
     * @return
     * @throws NettyTransactionException
     */
    @Override
    public boolean createNettyTransactionLock(long seconds) throws NettyTransactionException {
        boolean result = nettyTransactionLock.create(seconds);
        if (rejectFlag) {
            result = !result;
        }
        return result;
    }

    public void setRejectFlag(boolean rejectFlag) {
        this.rejectFlag = rejectFlag;
    }

    /**
     * 釋放鎖
     */
    @Override
    public void releaseNettyTransactionLock() {
        if (this.nettyTransactionLockType.equals(NettyTransactionLockType.FORCE_WRITE_TIME) || this.nettyTransactionLockType.equals(NettyTransactionLockType.WRITE_TIME)) {
            return;
        }
        this.nettyTransactionLock.destroy();
    }

    /**
     * 記錄事務提交的進度,用於回滾操作。
     * 根據進度進行不同程度的回滾
     *
     * @param step
     */
    public void setTransactionCommitProgress(int step) {
        if (progressBitSet != null) {
            progressBitSet.set(step);
        }
    }

    /**
     * 檢查事物鎖所處於的進度狀態
     *
     * @param step
     * @return
     */
    public boolean checkTransactionCommitProgress(int step) {
        return this.progressBitSet.get(step);
    }


    @Override
    public String getInfo() {
        return this.nettyTransactionLock.getInfo();
    }

    /**
     * 強制釋放鎖
     */
    @Override
    public void forceReleaseNettyTransactionLock() {
        this.nettyTransactionLock.destroy();
    }


    @Override
    public boolean needCommit() {
        return !this.nettyTransactionLockType.equals(NettyTransactionLockType.READ);
    }

    @Override
    public NettyTransactionLockInterface getNettyTransactionLockInterface() {
        return this.nettyTransactionLock;
    }

    public BitSet getProgressBitSet() {
        return progressBitSet;
    }
}

程式碼中的描述一樣。所有的具體事務型別都必須要繼承此類。保證每個事務都具有基本的操作。而更具實現類來具體實現每個事務產生的不同結果做不同處理。

下面我們來描述一下使用zookeeper的方式來實現分散式事務實體。同樣事務鎖也具有相同的行為。

package com.twjitm.transaction.transaction.entity;

import com.twjitm.transaction.lock.NettyTransactionLockInterface;
import com.twjitm.transaction.lock.NettyTransactionZkLock;
import com.twjitm.transaction.service.zookeeper.NettyTransactionZookeeperService;
import com.twjitm.transaction.transaction.enums.NettyTransactionEntityCause;
import com.twjitm.transaction.transaction.enums.NettyTransactionLockType;
import com.twjitm.transaction.transaction.exception.NettyTransactionException;

import java.util.BitSet;

/**
 * 基於zookeeper 實現的分散式事物鎖,、
 * <p>
 * zookeeper 分散式鎖僅僅支援獨佔鎖模式
 *
 * @author twjtim- [Created on 2018-08-29 16:18]
 * @jdk java version "1.8.0_77"
 */
public abstract class AbstractNettyTransactionZkEntity implements NettyTransactionEntityInterface {

    /**
     * 進度設定集合 主要用於rollback
     */
    private BitSet progressBitSet;

    /**
     * 事務鎖
     */
    private NettyTransactionLockInterface nettyTransactionLock;

    /**
     * 鎖型別
     */
    private NettyTransactionLockType nettyTransactionLockType;


    /**
     * 構建一個zookeeper型別的獨佔鎖實體物件
     *
     * @param cause
     * @param key
     * @param zookeeperService
     */
    public AbstractNettyTransactionZkEntity(NettyTransactionEntityCause cause,
                                            String key,
                                            NettyTransactionZookeeperService
                                                    zookeeperService) {
        this.progressBitSet = new BitSet();
        this.nettyTransactionLock = new NettyTransactionZkLock(key,
                zookeeperService, cause);
        this.nettyTransactionLockType = NettyTransactionLockType.WRITE;

    }


    /**
     * 建立一個鎖
     *
     * @param seconds
     * @return
     * @throws NettyTransactionException
     */
    @Override
    public boolean createNettyTransactionLock(long seconds) throws NettyTransactionException {

        return this.nettyTransactionLock.create(seconds);
    }


    /**
     * 記錄事務提交的進度,用於回滾操作。
     * 根據進度進行不同程度的回滾
     *
     * @param step
     */
    public void setTransactionCommitProgress(int step) {
        if (progressBitSet != null) {
            progressBitSet.set(step);
        }
    }


    /**
     * 檢查事物鎖所處於的進度狀態
     *
     * @param step
     * @return
     */
    public boolean checkTransactionCommitProgress(int step) {

        return this.progressBitSet.get(step);
    }


    /**
     * 釋放一個鎖請求
     */
    @Override
    public void releaseNettyTransactionLock() {
        this.nettyTransactionLock.destroy();
    }

    @Override
    public void forceReleaseNettyTransactionLock() {
        this.nettyTransactionLock.destroy();
    }

    @Override
    public String getInfo() {
        return this.nettyTransactionLock.getInfo() + this.nettyTransactionLockType.name();
    }

    @Override
    public boolean needCommit() {
        return !this.nettyTransactionLockType.equals(NettyTransactionLockType.READ);
    }

    @Override
    public NettyTransactionLockInterface getNettyTransactionLockInterface() {
        return this.nettyTransactionLock;
    }
}

功能和redis實現的方式是一樣的,只不過底層支援的方式不太一樣而已。

定義完事務實體後,我們來描述一下事務。事務實體是提供事務行為的描述,而事務本身的屬性還需要在事務本身上進行定義。所以我們來定義一個抽象的事務。在定義抽象事物之前我們也要描述事物的具體操作。

有如下定義:

  • 0:建立
  • 1:嘗試提交
  • 2:提交
  • 3:回滾
  • 4:釋放

同樣我們定義一個事務介面來描述上面的抽象事物的行為。NettyTransactionInterface

package com.twjitm.transaction.transaction;

import com.twjitm.transaction.transaction.enums.NettyTransactionCause;
import com.twjitm.transaction.transaction.exception.NettyTransactionException;

/**
 * @author twjitm- [Created on 2018-08-27 10:07]
 * @jdk java version "1.8.0_77"
 */
public interface NettyTransactionInterface {
    /** 啟用,構造*/
    int ACTIVE = 0;
    /** 嘗試提交*/
  int TRYCOMMITED = 1;
    /** 正式提交*/
    int COMMITED = 2;
    /** 正式回滾*/
    int ROLLEDBACK = 3;

    /**
     * 事務提交
     * @throws NettyTransactionException
     */
     void commit() throws NettyTransactionException;

    /**
     * 事務回滾
     * @throws NettyTransactionException
     */
     void rollback() throws NettyTransactionException;

    /**
     * 是否可以提交
     * @return
     */
     boolean canCommit();

    /**
     * 嘗試性提交
     */
     void tryCommit() throws NettyTransactionException;

    /**
     * 獲取事務原因
     * @return
     */
     NettyTransactionCause getCause();

    /**
     * 是否可以建立鎖
     * @return
     */
     boolean createNettyTransactionLock() throws NettyTransactionException;


    /**
     * 釋放鎖
     * @return
     */
     void releaseNettyTransactionLock();
}

在介面中我們還做了抽象事物狀態的定義。描述事物執行到某個階段。接下來我們需要定義一個抽象類來實現這個介面

package com.twjitm.transaction.transaction;

import com.twjitm.transaction.transaction.entity.NettyTransactionEntityInterface;
import com.twjitm.transaction.transaction.enums.NettyTransactionCause;
import com.twjitm.transaction.transaction.enums.NettyTransactionCommitResult;

import java.util.ArrayList;
import java.util.List;

/**
 * @author twjitm- [Created on 2018-08-27 10:13]
 * @jdk java version "1.8.0_77"
 */
public abstract class AbstractNettyTransaction implements NettyTransactionInterface {

    /**
     * 當前執行狀態
     */
    protected int state;
    /**
     * 事務實體 可以批量提交事物
     */
    public List<NettyTransactionEntityInterface> entities;
    /**
     * 事務原因
     */
    private NettyTransactionCause cause;

    /**
     * 遊戲事務提交結果
     */
    protected NettyTransactionCommitResult transactionTryCommitResult;

    public AbstractNettyTransaction(NettyTransactionCause cause) {
        this.cause = cause;
        this.entities = new ArrayList<>();
        transactionTryCommitResult = NettyTransactionCommitResult.SUCCESS;
        this.state = ACTIVE;
    }

    public void addEntity(NettyTransactionEntityInterface entity) {
        entities.add(entity);
    }

    @Override
    public NettyTransactionCause getCause() {
        return cause;
    }

    @Override
    public boolean canCommit() {
        return transactionTryCommitResult.equals(NettyTransactionCommitResult.SUCCESS);
    }

    public NettyTransactionCommitResult getTransactionTryCommitResult() {
        return transactionTryCommitResult;
    }
}

真正的事物描述是把抽象類中沒有實現的介面都實現了。具有全部功能的NettyTransaction

package com.twjitm.transaction.transaction;

import com.twjitm.transaction.config.GlobalConstants;
import com.twjitm.transaction.transaction.entity.NettyTransactionEntityInterface;
import com.twjitm.transaction.transaction.enums.NettyTransactionCause;
import com.twjitm.transaction.transaction.enums.NettyTransactionCommitResult;
import com.twjitm.transaction.transaction.exception.NettyTransactionException;
import com.twjitm.transaction.utils.TimeUtil;


/**
 * @author twjitm- [Created on 2018-08-27 10:39]
 * @jdk java version "1.8.0_77"
 */
public class NettyTransaction extends AbstractNettyTransaction {


    /**
     * 建立事務鎖等待時間
     */
    private long waitTime;

    /**
     * 建立事務實體:
     *
     * @param cause    事務產生原因
     * @param waitTime 等待時間
     */
    public NettyTransaction(NettyTransactionCause cause, long waitTime) {
        super(cause);
        this.waitTime = waitTime;
    }

    /**
     * 建立事務實體:
     *
     * @param cause 事務產生原因
     */
    public NettyTransaction(NettyTransactionCause cause) {
        super(cause);
        //預設鎖最長等待時間,防止網路延遲,伺服器宕機等情況。
        this.waitTime = GlobalConstants.Lock.TRAINSTACTION_LOCK_KEY_MAX_LIFE;
    }

    @Override
    public void commit() throws NettyTransactionException {
        if (state != TRYCOMMITED) {
            throw new NettyTransactionException();
        }
        this.state = COMMITED;
        for (NettyTransactionEntityInterface entity : entities) {
            if (!entity.needCommit()) {
                continue;
            }
            entity.commit();
        }


    }

    @Override
    public void rollback() throws NettyTransactionException {
        state = ROLLEDBACK;
        for (NettyTransactionEntityInterface entity : entities) {
            entity.rollback();
        }
    }

    @Override
    public void tryCommit() throws NettyTransactionException {
        if (state != ACTIVE) {
            throw new NettyTransactionException();
        }
        this.state = TRYCOMMITED;
        for (NettyTransactionEntityInterface entity : entities) {
            if (!entity.needCommit()) {
                continue;
            }
            //重複提交沒有成功
            NettyTransactionCommitResult transactionCommitResult = entity.tryCommit();
            if (!transactionCommitResult.equals(NettyTransactionCommitResult.SUCCESS)) {
                this.transactionTryCommitResult = transactionCommitResult;
                break;
            }
        }

    }

    /**
     * 是否可以建立一個分散式事物鎖
     *
     * @return
     * @throws NettyTransactionException
     */
    @Override
    public boolean createNettyTransactionLock() throws NettyTransactionException {
        if (state != ACTIVE) {
            throw new NettyTransactionException();
        }
        long startSecond = TimeUtil.getSeconds();
        boolean createFlag;
        if (waitTime > 0) {
            while (true) {
                long currSeconds = TimeUtil.getSeconds();
                createFlag = createNettyTransactionLock(currSeconds);
                if (createFlag) {
                    break;
                }
                try {
                    Thread.sleep(TimeUtil.SECOND);
                } catch (Throwable e) {

                }
                currSeconds = TimeUtil.getSeconds();
                if (startSecond + waitTime < currSeconds) {
                    createFlag = false;
                    break;
                }
            }
        } else {
            startSecond = TimeUtil.getSeconds();
            createFlag = createNettyTransactionLock(startSecond);
        }
        return createFlag;
    }


    private boolean createNettyTransactionLock(long currSeconds) throws NettyTransactionException {
        boolean createFlag = false;
        for (NettyTransactionEntityInterface entity : entities) {
            try {
                createFlag = entity.createNettyTransactionLock(currSeconds);
            } catch (Exception e) {
                throw new NettyTransactionException(e.getMessage());
            }
            if (!createFlag) {
                break;
            }
        }
        return createFlag;
    }

    /**
     * 釋放鎖
     */
    @Override
    public void releaseNettyTransactionLock() {
        for (NettyTransactionEntityInterface entity : entities) {
            entity.releaseNettyTransactionLock();
        }
    }

    @Override
    public String toString() {
        StringBuffer buffer = new StringBuffer();
        buffer.append("transaction ");
        buffer.append(getCause());
        buffer.append(":");
        for (int i = 0; i < entities.size(); i++) {
            NettyTransactionEntityInterface entity = entities.get(i);
            buffer.append(entity.getInfo());
            if (i < entities.size() - 1) {
                buffer.append(",");
            }
        }
        return buffer.toString();


    }
}

上面就是有關事務、事務實體核心功能的描述,下面我們繼續來介紹事務鎖的描述和定義,同樣的方法我們在事務鎖上的操作進行分析,事務鎖實體,其實和jdk自帶的鎖在描述上來說是差不多的。都是為了保護資料而產生的一種行為的定義。

因此我們在專案中的lock包中定義如下類 在這裡插入圖片描述

鎖的行為有:

  • 1:建立
  • 2:登出
  • 3:設定內容

根據上面定義的事務鎖行為。我們有如下介面的描述NettyTransactionLockInterface.java

package com.twjitm.transaction.lock;


import com.twjitm.transaction.transaction.exception.NettyTransactionException;

/**
 * 事務鎖介面
 * <p>
 *     本類面向的是鎖。主要是為事物實體提供原子操作
 *     利用redis 的原子操作實現分散式事物鎖,
 *     該抽象介面定義了鎖的基本操作。
 * </p>
 */
public interface NettyTransactionLockInterface {
    /**
     * 銷燬
     */
     void destroy();

    /**
     * 建立
     * @return
     */
     boolean create(long seconds)  throws NettyTransactionException;

    /**
     * 獲取資訊
     * @return
     */
     String getInfo();

    /**
     * 設定內容
     */
     void setContent(String lockContent);

}


還記得我們前面描述的鎖的種類嗎?我們有寫鎖,讀鎖,強佔鎖,超時鎖等,下面我們來實現這些鎖的具體實體類。NettyTransactionLock

package com.twjitm.transaction.lock;

import com.twjitm.transaction.service.redis.NettyTransactionRedisService;
import com.twjitm.transaction.transaction.enums.NettyTransactionEntityCause;
import com.twjitm.transaction.transaction.enums.NettyTransactionLockStateEnum;
import com.twjitm.transaction.transaction.exception.NettyTransactionException;
import com.twjitm.transaction.utils.TimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * netty 事物鎖,基於redis實現
 * <pre>
 * 寫鎖
 * </pre>
 *
 * @author twjitm- [Created on 2018-08-27 12:05]
 * @jdk java version "1.8.0_77"
 */
public class NettyTransactionLock implements NettyTransactionLockInterface {


    private Logger logger = LoggerFactory.getLogger(NettyTransactionLock.class);
    /**
     * 事物鎖關鍵字
     */
    private String lockKey;
    /**
     * 事物鎖建立需要的redis服務
     */
    private NettyTransactionRedisService redisService;

    /**
     * 事務支出的zookeeper伺服器
     *
     *
     */


    /**
     * 事物產生實體的原因
     */
    private NettyTransactionEntityCause cause;

    /**
     * 事物鎖狀態
     */
    private NettyTransactionLockStateEnum lockStateEnum;

    /**
     * 鎖時長
     */
    private int lockTime;

    /**
     * 鎖強制標識
     */
    private boolean forceFlag;

    /**
     * 鎖內容
     */
    private String lockContext = "";

    /**
     * 初始化一個寫鎖,預設鎖時間為系統配置時間。
     *
     * @param lockKey
     * @param redisService
     * @param cause
     */
    public NettyTransactionLock(String lockKey, NettyTransactionRedisService redisService, NettyTransactionEntityCause cause) {
        super();
        this.lockKey = lockKey;
        this.redisService = redisService;
        this.cause = cause;
        this.lockStateEnum = NettyTransactionLockStateEnum.INIT;
        this.lockTime = TimeUtil.MINUTE_SECOND;

    }


    /**
     * 初始化一個寫鎖。並且指定鎖時間,是否具有強制性。
     *
     * @param lockKey
     * @param redisService
     * @param cause
     * @param lockTime
     * @param forceFlag
     */
    public NettyTransactionLock(String lockKey, NettyTransactionRedisService redisService, NettyTransactionEntityCause cause, int lockTime, boolean forceFlag) {
        super();
        this.lockKey = lockKey;
        this.redisService = redisService;
        this.cause = cause;
        this.lockStateEnum = NettyTransactionLockStateEnum.INIT;
        this.lockTime = lockTime;
        this.forceFlag = forceFlag;


    }

    /**
     * 初始化一個具有內容的寫鎖,並且制定鎖時間和強制性標誌。以及所內容
     *
     * @param lockKey
     * @param redisService
     * @param cause
     * @param lockTime
     * @param forceFlag
     * @param lockContext
     */
    public NettyTransactionLock(String lockKey, NettyTransactionRedisService redisService, NettyTransactionEntityCause cause, int lockTime, boolean forceFlag, String lockContext) {
        super();
        this.lockKey = lockKey;
        this.redisService = redisService;
        this.cause = cause;
        this.lockStateEnum = NettyTransactionLockStateEnum.INIT;
        this.lockTime = lockTime;
        this.forceFlag = forceFlag;
        this.lockContext = lockContext;
    }


    /**
     * <p>
     * 登出一個鎖。
     * 鎖的登出是有條件的。鎖不能再初始化的時候和建立的時候登出。
     * 只能這個鎖建立成功後才能登出。建立一個鎖。將必須使用這個鎖。若建立一個鎖不進行使用
     * 的話,將無法登出這個鎖。只能等鎖時間過期後才能自動登出鎖
     * </p>
     */
    @Override
    public void destroy() {
        if (this.lockStateEnum.equals(NettyTransactionLockStateEnum.INIT) ||
                this.lockStateEnum.equals(
                        NettyTransactionLockStateEnum.CREATE)) {
            return;
        }
        boolean destroyFlag = true;
        if (!lockContext.equals("".trim())) {
            destroyFlag = checkLockContext();
        }
        String realLockKey = getLockKey(lockKey, cause);
        if (destroyFlag) {
            boolean delete = redisService.deleteKey(realLockKey);
            if (!delete) {
                logger.info("居然沒有刪除掉這個key=" + realLockKey);
            }
        }
    }

    /**
     * 檢測鎖內容
     *
     * @return
     */
    private boolean checkLockContext() {
        boolean checkFlag = false;
        String content = redisService.getString(getLockKey(lockKey, cause));
        if (content != null) {
            checkFlag = content.equals(this.lockContext);
        }
        return checkFlag;

    }


    /**
     * 獲取鎖可以
     *
     * @param lockKey
     * @param cause
     * @return
     */
    public String getLockKey(String lockKey, NettyTransactionEntityCause cause) {
        return lockKey + "#" + cause.getCause();
    }


    /**
     * 建立分散式事物鎖,建立一個分散式事物鎖的代價是比較高的,
     * 應為需要將請求訊息傳送到對應的redis伺服器或者是zookeeper伺服器
     * 但是當我們邏輯伺服器和redis不在同一臺伺服器的時候,我們需要走網路層
     * 連線,相當於開啟一個tcp連線通道。這條通道主要是為了我們能夠與redis或者zookeeper
     * 伺服器進行通訊,為可防止單點問題,我們可以將redis做成叢集模式,同樣zookeeper也
     * 一樣。當然在這個地方我們預設使用redis實現分散式所務鎖,當別的邏輯伺服器申請鎖的
     * 的時候也會進行建立。利用redis的原子性,保證本鎖的原子性。
     *
     * @param seconds
     * @return
     * @throws NettyTransactionException
     */
    @Override
    public boolean create(long seconds) throws NettyTransactionException {
        this.lockStateEnum = NettyTransactionLockStateEnum.CREATE;
        boolean createFlag;
        String realLockKey = getLockKey(lockKey, cause);
        try {
            //設定鎖標識
            createFlag = redisService.setNxString(realLockKey, lockContext, lockTime);
            if (createFlag) {
                this.lockStateEnum = NettyTransactionLockStateEnum.SUCCESS;
                logger.info("建立鎖成功");
                redisService.expire(realLockKey, lockTime);
            } else {
                if (forceFlag) {
                    this.lockStateEnum = NettyTransactionLockStateEnum.SUCCESS;
                    redisService.setString(realLockKey, lockContext, lockTime);
                    redisService.expire(realLockKey, lockTime);
                    createFlag = true;
                    logger.info("建立強制鎖:" + realLockKey + ",過期時間長度為: " + lockTime);
                } else {
                    createFlag = false;
                    logger.info("建立鎖失敗" + realLockKey + ",過期時間為: " + lockTime);
                }
            }

        } catch (Exception e) {
            throw new NettyTransactionException("建立鎖發生意想不到的錯誤,請檢查");
        }
        return createFlag;
    }

    @Override
    public String getInfo() {
        return lockKey + cause + checkLockContext() + lockTime;
    }

    @Override
    public void setContent(String lockContent) {
        this.lockContext = lockContent;
    }
}

其實程式碼中已經有很詳細的介紹了。每個部分的功能都可以在註釋中可以看到。正如事務鎖的建立。程式碼做了詳細的描述。在這就省略了。

讀鎖

package com.twjitm.transaction.lock;

import com.twjitm.transaction.service.redis.NettyTransactionRedisService;
import com.twjitm.transaction.transaction.enums.NettyTransactionEntityCause;
import com.twjitm.transaction.transaction.enums.NettyTransactionLockStateEnum;
import com.twjitm.transaction.transaction.exception.NettyTransactionException;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * netty 遊戲事物鎖,基於redis實現的分散式遊戲事務鎖
 * <pre>
 *     讀鎖
 * </pre>
 *
 * @author twjitm - [Created on 2018-08-27 12:14]
 * @jdk java version "1.8.0_77"
 */
public class NettyTransactionReadLock implements NettyTransactionLockInterface {
    private Logger logger = LoggerFactory.getLogger(NettyTransactionReadLock.class);
    /**
     * 事物鎖key
     */
    private String lockKey;
    /**
     * 鎖提供的redis
     */
    private NettyTransactionRedisService redisService;
    /**
     * 事物鎖引數原因
     */
    private NettyTransactionEntityCause cause;

    /**
     * 分散式讀鎖狀態
     */
    private NettyTransactionLockStateEnum lockState;

    /**
     * 分散式讀鎖內容
     */
    private String lockContext;


    public NettyTransactionReadLock(String lockKey, NettyTransactionRedisService redisService, NettyTransactionEntityCause cause) {
        super();
        this.lockKey = lockKey;
        this.redisService = redisService;
        this.cause = cause;
        this.lockState = NettyTransactionLockStateEnum.INIT;


    }

    /**
     * 登出一個鎖
     */
    @Override
    public void destroy() {
        if (this.lockState == NettyTransactionLockStateEnum.INIT || this.lockState == NettyTransactionLockStateEnum.CREATE) {
            return;
        }
        boolean exists = redisService.exists(getLockKey(lockKey, cause));
        if (exists && !StringUtils.isEmpty(lockContext)) {
            exists = this.checkLockContext();
            if (exists) {
                redisService.deleteKey(getLockKey(lockKey, cause));
            }
        }


    }

    @Override
    public boolean create(long seconds) throws NettyTransactionException {
        this.lockState = NettyTransactionLockStateEnum.CREATE;
        //檢測值是否存在
        boolean exists = redisService.exists(getLockKey(lockKey, cause));
        //檢測內容是否為空
        if (exists && !StringUtils.isEmpty(lockContext)) {
            exists = this.checkLockContext();
        }
        return exists;
    }


    private boolean checkLockContext() {

        boolean checkFlag = false;
        String realLockKey = getLockKey(lockKey, cause);
        String content = redisService.getString(realLockKey);
        if (!StringUtils.isEmpty(content)) {
            logger.info("read content realLockKey:" + realLockKey);
            checkFlag = content.equals(this.lockContext);
        }
        return checkFlag;
    }

    /**
     * 獲取鎖可以
     *
     * @param lockKey
     * @param cause
     * @return
     */
    public String getLockKey(String lockKey, NettyTransactionEntityCause cause) {
        return lockKey + "#" + cause.getCause();
    }


    @Override
    public String getInfo() {
        return this.lockKey + this.cause + this.lockContext;
    }

    @Override
    public void setContent(String lockContent) {
        this.lockContext = lockContent;
    }
}

基於zookeeper實現的寫鎖。NettyTransactionZkLock

package com.twjitm.transaction.lock;

import com.twjitm.transaction.service.zookeeper.NettyTransactionZookeeperService;
import com.twjitm.transaction.transaction.enums.NettyTransactionEntityCause;
import com.twjitm.transaction.transaction.enums.NettyTransactionLockStateEnum;
import com.twjitm.transaction.transaction.exception.NettyTransactionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 基於zookeeper分散式事務鎖實體,zookeeper 實現分散式鎖
 * 基於zookeeper實現分散式鎖存在的缺點:
 * 由於zookeeper天生的特性,我們在建立節點的時候最好建立臨時節點
 * 防止長期佔用鎖,造成死鎖。由於未知原因,可能程式釋放鎖失敗。
 *
 * @author twjitm- [Created on 2018-08-29 14:52]
 */
public class NettyTransactionZkLock implements NettyTransactionLockInterface {

    private Logger logger = LoggerFactory.getLogger(NettyTransactionZkLock.class);
    /**
     * 事物鎖關鍵字
     */
    private String lockKey;
    /**
     * 事物鎖建立需要的zookeeper服務
     */

    private NettyTransactionZookeeperService zookeeperService;

    /**
     * 事物鎖引數原因
     */
    private NettyTransactionEntityCause cause;

    /**
     * 事物鎖裝填
     */
    private NettyTransactionLockStateEnum lockStateEnum;

    /**
     * 分散式讀鎖內容
     */
    private String lockContext="";


    public NettyTransactionZkLock(String lockKey, NettyTransactionZookeeperService
            zookeeperService,
                                  NettyTransactionEntityCause cause) {
        super();
        this.lockKey = lockKey;
        this.zookeeperService = zookeeperService;
        this.cause = cause;
        this.lockContext="";
    }

    public NettyTransactionZkLock(String lockKey,
                                  NettyTransactionZookeeperService zookeeperService,
                                  NettyTransactionEntityCause cause,
                                  NettyTransactionLockStateEnum lockState) {
        super();
        this.lockKey = lockKey;
        this.zookeeperService = zookeeperService;
        this.cause = cause;
        this.lockStateEnum = lockState;
    }

    public NettyTransactionZkLock(String lockKey, NettyTransactionZookeeperService
            zookeeperService,
                                  NettyTransactionEntityCause cause,
                                  NettyTransactionLockStateEnum lockState, String
                                          lockContext) {
        super();
        this.lockKey = lockKey;
        this.zookeeperService = zookeeperService;
        this.cause = cause;
        this.lockStateEnum = lockState;
        this.lockContext = lockContext;
    }

    /**
     * 登出一個鎖
     */
    @Override
    public void destroy() {
        //這兩種狀態不能登出鎖
        if (this.lockStateEnum.equals(NettyTransactionLockStateEnum.INIT) ||
                this.lockStateEnum.equals(
                        NettyTransactionLockStateEnum.CREATE)) {
            return;
        }
        String realLockKey = getLockKey(lockKey, cause);
        boolean delete = zookeeperService.deleteNode(realLockKey);
        if (!delete) {
            logger.info("居然沒有刪除掉這個key=" + realLockKey);
        }
    }


    /**
     * 建立鎖節點
     *
     * @param lockKey
     * @param cause
     * @return
     */
    public String getLockKey(String lockKey, NettyTransactionEntityCause cause) {
        return lockKey + "_" + cause.getCause();
    }


    /**
     * 建立鎖
     *
     * @param
     * @return
     * @throws NettyTransactionException
     */
    @Override
    public boolean create(long seconds) throws NettyTransactionException {
        this.lockStateEnum = NettyTransactionLockStateEnum.CREATE;
        boolean createFlag;
        String realKey = getLockKey(lockKey, cause);
        //建立節點
        createFlag = zookeeperService.createNode(realKey, lockContext);
        if (createFlag) {
            this.lockStateEnum = NettyTransactionLockStateEnum.SUCCESS;
            logger.info("建立鎖成功" + this.getInfo());
        } else{
            logger.info("獲得鎖失敗" + this.getInfo());
        }
        return createFlag;
    }

    @Override
    public String getInfo() {
        return this.lockKey + cause.getCause() + this.lockStateEnum.name() +
                this.lockContext;
    }

    @Override
    public void setContent(String lockContent) {
        this.lockContext = lockContent;
    }
}

到此,有關事物鎖的定義就完成。基礎元件定義完成之後我們需要對外提供事務鎖服務。這是我們系統所具有的意義。對外提供分散式事務鎖,是我們的核心功能。上訴的功能執行為了實現這個分散式事務鎖功能的必備元件,因此我們定義一個服務介面。

package com.twjitm.transaction.service.transaction;

import com.twjitm.transaction.transaction.entity.AbstractNettyTransactionEntity;
import com.twjitm.transaction.transaction.entity.AbstractNettyTransactionZkEntity;
import com.twjitm.transaction.transaction.enums.NettyTransactionCause;
import com.twjitm.transaction.transaction.enums.NettyTransactionCommitResult;

/**
 * 事物對外提供的服務
 * 可以批量提交事物
 *
 * @author twjitm- [Created on 2018-08-27 15:42]
 * @jdk java version "1.8.0_77"
 */
public interface NettyTransactionService {

    /**
     * redis 模式提交事務
     * @param cause
     * @param abstractGameTransactionEntity
     * @return
     */
    NettyTransactionCommitResult commitTransaction(NettyTransactionCause cause, AbstractNettyTransactionEntity... abstractGameTransactionEntity);

    /**
     * redis 模式提交事務
     * @param cause
     * @param waitTime
     * @param abstractGameTransactionEntity
     * @return
     */
    NettyTransactionCommitResult commitTransaction(NettyTransactionCause cause, long waitTime, AbstractNettyTransactionEntity... abstractGameTransactionEntity);

    /**
     * zookeeper 模式來提交事物鎖
     *
     * @param cause
     * @param abstractNettyTransactionZkEntities
     * @return
     */

    NettyTransactionCommitResult commitTransaction(NettyTransactionCause cause, AbstractNettyTransactionZkEntity... abstractNettyTransactionZkEntities);


}

介面定義的幾個方法都是具有相同的意義。都是提交事物。只不過一個是redis實現的,一個是zookeeper實現的。介面定義好了我們需要實現介面中的方法。下面是本系統的最核心的關鍵之處。

package com.twjitm.transaction.service.transaction;

import com.twjitm.transaction.transaction.NettyTransaction;
import com.twjitm.transaction.transaction.entity.AbstractNettyTransactionEntity;
import com.twjitm.transaction.transaction.entity.AbstractNettyTransactionZkEntity;
import com.twjitm.transaction.transaction.entity.NettyTransactionEntityInterface;
import com.twjitm.transaction.transaction.enums.NettyTransactionCause;
import com.twjitm.transaction.transaction.enums.NettyTransactionCommitResult;
import com.twjitm.transaction.transaction.exception.NettyTransactionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

/**
 * 事物提交服務
 *
 * @author twjitm- [Created on 2018-08-27 15:44]
 * @jdk java version "1.8.0_77"
 */
@Service
public class NettyTransactionServiceImpl implements NettyTransactionService {
    Logger logger = LoggerFactory.getLogger(NettyTransactionServiceImpl.class);

    @Override
    public NettyTransactionCommitResult commitTransaction(NettyTransactionCause transactionCause, AbstractNettyTransactionEntity... abstractGameTransactionEntity) {
        NettyTransaction transaction = new NettyTransaction(transactionCause);
        return commitTransaction(transaction, abstractGameTransactionEntity);
    }

    @Override
    public NettyTransactionCommitResult commitTransaction(NettyTransactionCause gameTransactionCause, long waitTime, AbstractNettyTransactionEntity... abstractGameTransactionEntity) {
        NettyTransaction transaction = new NettyTransaction(gameTransactionCause, waitTime);
        return commitTransaction(transaction, abstractGameTransactionEntity);
    }

    @Override
    public NettyTransactionCommitResult commitTransaction(NettyTransactionCause cause, AbstractNettyTransactionZkEntity... abstractNettyTransactionZkEntities) {
        NettyTransaction transaction = new NettyTransaction(cause);

        return commitTransaction(transaction, abstractNettyTransactionZkEntities);
    }

    /**
     * 二階段和三階段的區別
     * http://www.hollischuang.com/archives/681
     *
     * @param transaction                     事務
     * @param abstractGameTransactionEntities 事務實體集和
     * @return                                事務執行返回結果
     */
    private NettyTransactionCommitResult commitTransaction(NettyTransaction transaction, NettyTransactionEntityInterface... abstractGameTransactionEntities) {
        NettyTransactionCommitResult tryCommitResult = NettyTransactionCommitResult.SUCCESS;
        for (NettyTransactionEntityInterface entityInterface : abstractGameTransactionEntities) {
            transaction.addEntity(entityInterface);
        }
        try {
            //如果能夠建立分散式伺服器鎖
            if (transaction.createNettyTransactionLock()) {
                logger.info("成功獲得鎖: " + transaction.toString());
                logger.info("嘗試提交鎖: " + transaction.toString());
                transaction.tryCommit();
                if (transaction.canCommit()) {
                    logger.info("正式提交鎖: " + transaction.toString());
                    transaction.commit();
                    logger.info("提交鎖成功: " + transaction.toString());
                } else {
                    logger.info("重複提交鎖: " + transaction.toString());
                    tryCommitResult = transaction.getTransactionTryCommitResult();
                    logger.info("重複提交鎖失敗: " + transaction.toString());
                }
            } else {
                logger.info("獲得鎖失敗: " + transaction.toString());
                tryCommitResult = NettyTransactionCommitResult.LOCK_ERROR;
            }
        } catch (Exception e) {
            logger.info("提交鎖發生異常: " + transaction.toString());
            try {
                logger.info("開始回滾鎖: " + transaction.toString());
                transaction.rollback();
                logger.info("回滾鎖成功: " + transaction.toString());
            } catch (NettyTransactionException e1) {
                e1.printStackTrace();
                logger.info("回滾鎖發生異常: " + transaction.toString());
            }
            //異常事務原因
            tryCommitResult = NettyTransactionCommitResult.COMMON_ERROR;
            if (e instanceof NettyTransactionException) {
                NettyTransactionException exception = (NettyTransactionException) e;
                NettyTransactionCommitResult tempGameTransactionTryCommitResult =
                        exception.getResult();
                if (tempGameTransactionTryCommitResult != null) {
                    tryCommitResult = tempGameTransactionTryCommitResult;
                }
            }

        } finally {
            //釋放鎖
            logger.info("釋放鎖開始: " + transaction.toString());
            transaction.releaseNettyTransactionLock();
            logger.info("釋放鎖成功: " + transaction.toString());
        }
        return tryCommitResult;
    }


}

由於篇幅原因,我們就不統統描述全部程式碼了。其實程式碼已經做了詳細的介紹。相信有點java基礎的同學都能夠看明白。只不過此工程描述的是一種思想而已。 到此核心程式碼基本介紹完。那我們就來測試一下。前面在先睹為快的地方的時候我們已經看到了。下面我們來詳細說明一下如何使用分散式事務鎖框架。

如何使用

互斥鎖 首先我們定義一個實體。MutexEntity需要繼承AbstractNettyTransactionEntity實現為實現的方法。

package com.twjitm.transaction.entity;

import com.twjitm.transaction.service.redis.NettyTransactionRedisService;
import com.twjitm.transaction.transaction.entity.AbstractNettyTransactionEntity;
import com.twjitm.transaction.transaction.enums.NettyTransactionCommitResult;
import com.twjitm.transaction.transaction.enums.NettyTransactionEntityCause;
import com.twjitm.transaction.transaction.exception.NettyTransactionException;

import java.util.BitSet;

/**
 * 互斥鎖測試 實體
 *
 * @author twjtim- [Created on 2018-08-27 18:12]
 * @jdk java version "1.8.0_77"
 */
public class MutexEntity extends AbstractNettyTransactionEntity {

    NettyTransactionRedisService redisService;
    String testKey;

    public MutexEntity(NettyTransactionEntityCause cause, String key,
                       NettyTransactionRedisService redisService) {
        super(cause, key, redisService);
        this.redisService = redisService;
        this.testKey = key;
    }

    @Override
    public void commit() throws NettyTransactionException {
        redisService.setString("mutex_test", "twjitm");
        throw new NullPointerException();
    }

    @Override
    public void rollback() throws NettyTransactionException {
        BitSet bitset = getProgressBitSet();
        for (int i = 0; i < bitset.size(); i++) {
            //@TODO 不同粒度回滾
        }
        redisService.deleteKey("mutex_test");
    }

    @Override
    public NettyTransactionCommitResult tryCommit() throws NettyTransactionException {

        return NettyTransactionCommitResult.SUCCESS;
    }
}

在測試中我們模擬在提交鎖階段丟擲一個空指標異常,在回滾階段我們來做一些操作。 測試類

package com.twjitm.transaction.mtex;

import com.twjitm.transaction.entity.MutexEntity;
import com.twjitm.transaction.service.redis.impl.NettyTransactionRedisServiceImpl;
import com.twjitm.transaction.service.transaction.NettyTransactionServiceImpl;
import com.twjitm.transaction.spring.TestSpring;
import com.twjitm.transaction.transaction.enums.NettyTransactionCause;
import com.twjitm.transaction.transaction.enums.NettyTransactionCommitResult;
import com.twjitm.transaction.transaction.enums.NettyTransactionEntityCause;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.util.Assert;

/**
 * 互斥鎖測試
 *
 * @author twjitm- [Created on 2018-08-28 10:31]
 */
public class TestMutex {

    public static void main(String[] args) {



        ClassPathXmlApplicationContext applicationContext = TestSpring.initSpring();
        NettyTransactionRedisServiceImpl nettyTransactionRedisService =
                (NettyTransactionRedisServiceImpl) applicationContext.getBean
                        ("nettyTransactionRedisService");

        NettyTransactionServiceImpl nettyTransactionService = (NettyTransactionServiceImpl) applicationContext.getBean("nettyTransactionServiceImpl");


        NettyTransactionEntityCause cause = new NettyTransactionEntityCause("mutex");

        MutexEntity mutexEntity = new MutexEntity(cause, "mutex", nettyTransactionRedisService);
        NettyTransactionCause transactionCause = new NettyTransactionCause("mutex");

        NettyTransactionCommitResult result =
                nettyTransactionService.commitTransaction(transactionCause, mutexEntity);
        System.out.println(result.getResult());
        Assert.isTrue(true,"");
    }
}

用一個簡單的main函式啟動系統,測試功能。 執行結果: 在這裡插入圖片描述

下面我們通過一個流程圖來簡單總結本系統 在這裡插入圖片描述

專案原始碼開源道我的GitHub,有需要的同學可以下載。歡迎star