1. 程式人生 > >shardingjdbc (九)-最大努力型事務

shardingjdbc (九)-最大努力型事務

一 序:

Sharding-JDBC由於效能方面的考量,決定不支援強一致性分散式事務。目前支援的:

  • Best efforts delivery transaction (已經實現).
  • Try confirm cancel transaction (待定).

最大努力送達型:在分散式資料庫的場景下,相信對於該資料庫的操作最終一定可以成功,所以通過最大努力反覆嘗試送達操作。


實際上可以看看圖上的流程,不管執行結果如何,執行前事件都會記錄事務日誌;執行事件型別包括3種:BEFORE_EXECUTE,EXECUTE_FAILURE和EXECUTE_SUCCESS;另外,這裡的”同步“不是絕對的同步執行,而是通過google-guava的EventBus釋出事件後,在監聽端判斷是EXECUTE_FAILURE事件,最多重試syncMaxDeliveryTryTimes次;

適用場景

  • 根據主鍵刪除資料。
  • 更新記錄永久狀態,如更新通知送達狀態。

使用限制

使用最大努力送達型柔性事務的SQL需要滿足冪等性。

  • INSERT語句要求必須包含主鍵,且不能是自增主鍵。
  • UPDATE語句要求冪等,不能是UPDATE xxx SET x=x+1
  • DELETE語句無要求。

這裡看看官網demo:http://shardingjdbc.io/document/legacy/2.x/cn/02-guide/transaction/

整個過程通過如下 元件 完成:
柔性事務管理器
最大努力送達型柔性事務
最大努力送達型事務監聽器
事務日誌儲存器

最大努力送達型非同步作業

二 柔性事務管理器

      之前的《SQL執行》對ExecutorEngine的分析可知,sharding-jdbc在執行SQL前後,分別呼叫EventBusInstance.getInstance().post()提交了事件,那麼呼叫EventBusInstance.getInstance().register()的地方,就是柔性事務處理的地方,通過檢視原始碼的呼叫關係可知,只有SoftTransactionManager.init()呼叫了EventBusInstance.getInstance().register(),所以柔性事務實現的核心在SoftTransactionManager這裡;

2.1 SoftTransactionManager     

  柔性事務管理器,SoftTransactionManager 實現,負責對柔性事務配置( SoftTransactionConfiguration ) 、柔性事務( AbstractSoftTransaction )的管理。
public final class SoftTransactionManager {

    private static final String TRANSACTION = "transaction";

    private static final String TRANSACTION_CONFIG = "transactionConfig";

    @Getter
    private final SoftTransactionConfiguration transactionConfig;
    
    /**
     * Initialize B.A.S.E transaction manager.
     * 
     * @throws SQLException SQL exception
     */
    public void init() throws SQLException {
        // 初始化 最大努力送達型事務監聽器
        EventBusInstance.getInstance().register(new BestEffortsDeliveryListener());
        // 初始化 事務日誌資料庫儲存表
        if (TransactionLogDataSourceType.RDB == transactionConfig.getStorageType()) {
            Preconditions.checkNotNull(transactionConfig.getTransactionLogDataSource());
            createTable();
        }
         // 初始化 內嵌的最大努力送達型非同步作業
        if (transactionConfig.getBestEffortsDeliveryJobConfiguration().isPresent()) {
            new NestedBestEffortsDeliveryJobFactory(transactionConfig).init();
        }
    }

通過上面的程式碼,可知這裡的涉及的重點如下:

   將最大努力送達型事務監聽器( BestEffortsDeliveryListener )註冊到事務匯流排 ( EventBus ),
    當使用資料庫儲存事務日誌( TransactionLog ) 時,若事務日誌表( transaction_log )不存在則進行建立.

    當配置使用內嵌的最大努力送達型非同步作業( NestedBestEffortsDeliveryJob ) 時,進行初始化

2.2 SoftTransactionConfiguration

 柔性事務配置物件

public class SoftTransactionConfiguration {
    
    /**
     * Data source for transaction manager.
     */
    @Getter(AccessLevel.NONE)
    private final DataSource targetDataSource;
    
    /**
     * Max synchronized delivery try times.
     */
    private int syncMaxDeliveryTryTimes = 3;
    
    /**
     * Transaction log storage type.
     */
    private TransactionLogDataSourceType storageType = RDB;
    
    /**
     * Transaction log data source.
     */
    private DataSource transactionLogDataSource;
    
    /**
     * Embed best efforts delivery B.A.S.E transaction asynchronized job configuration.
     */
    private Optional<NestedBestEffortsDeliveryJobConfiguration> bestEffortsDeliveryJobConfiguration = Optional.absent();

2.3 柔性事務 

在 Sharding-JDBC 裡,目前柔性事務分成兩種:

BEDSoftTransaction :最大努力送達型柔性事務
TCCSoftTransaction :TCC型柔性事務
繼承 AbstractSoftTransaction
public abstract class AbstractSoftTransaction {
    
    private boolean previousAutoCommit;
    
    @Getter
    private ShardingConnection connection;
    
    @Getter
    private SoftTransactionType transactionType;
    
    @Getter
    private String transactionId;

提供了開始事務beginInternal,結束事務end供子類呼叫。

protected final void beginInternal(final Connection conn, final SoftTransactionType type) throws SQLException {
        // TODO 判斷如果在傳統事務中,則拋異常
        Preconditions.checkArgument(conn instanceof ShardingConnection, "Only ShardingConnection can support eventual consistency transaction.");
        //   設定執行錯誤,不丟擲異常
        ExecutorExceptionHandler.setExceptionThrown(false);
        connection = (ShardingConnection) conn;
        transactionType = type;
        //設定自動提交狀態
        previousAutoCommit = connection.getAutoCommit();
        connection.setAutoCommit(true);
        // TODO replace to snowflake:以後用snowflake生成事務編號替換uuid
        transactionId = UUID.randomUUID().toString();
    }

注意點:1 。SQL異常不丟擲,會繼續執行。

              2. 自動提交,所以不支援回滾。

	public final void end() throws SQLException {
        if (null != connection) {
            ExecutorExceptionHandler.setExceptionThrown(true);
            connection.setAutoCommit(previousAutoCommit);
            SoftTransactionManager.closeCurrentTransactionManager();
        }
    }
     /**
     * Close transaction manager from current thread.
     */
    static void closeCurrentTransactionManager() {
        ExecutorDataMap.getDataMap().put(TRANSACTION, null);
        ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, null);
    }
end作用是事務結束後清理執行緒變數。BEDSoftTransaction
public class BEDSoftTransaction extends AbstractSoftTransaction {
    
    /**
     * Begin transaction.
     * 
     * @param connection database connection
     * @throws SQLException SQL exception
     */
    public void begin(final Connection connection) throws SQLException {
        beginInternal(connection, SoftTransactionType.BestEffortsDelivery);
    }
}

2.4 建立柔性事務

通過呼叫 SoftTransactionManager.getTransaction() 建立柔性事務物件:

public AbstractSoftTransaction getTransaction(final SoftTransactionType type) {
        AbstractSoftTransaction result;
        switch (type) {
            case BestEffortsDelivery: 
                result = new BEDSoftTransaction();
                break;
            case TryConfirmCancel:
                result = new TCCSoftTransaction();
                break;
            default: 
                throw new UnsupportedOperationException(type.toString());
        }
        // TODO don't support nested transaction(巢狀事務), should configurable in future
        if (getCurrentTransaction().isPresent()) {
            throw new UnsupportedOperationException("Cannot support nested transaction.");
        }
        ExecutorDataMap.getDataMap().put(TRANSACTION, result);
        ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, transactionConfig);
        return result;
    }

獲取柔性事務配置:

public static Optional<SoftTransactionConfiguration> getCurrentTransactionConfiguration() {
        Object transactionConfig = ExecutorDataMap.getDataMap().get(TRANSACTION_CONFIG);
        return (null == transactionConfig)
                ? Optional.<SoftTransactionConfiguration>absent()
                : Optional.of((SoftTransactionConfiguration) transactionConfig);
    }
public static Optional<AbstractSoftTransaction> getCurrentTransaction() {
        Object transaction = ExecutorDataMap.getDataMap().get(TRANSACTION);
        return (null == transaction)
                ? Optional.<AbstractSoftTransaction>absent()
                : Optional.of((AbstractSoftTransaction) transaction);
    }

3 事務日誌儲存器

  柔性事務執行過程中,會通過事務日誌( TransactionLog ) 記錄每條 SQL 執行狀態:
        SQL 執行前,記錄一條事務日誌
        SQL 執行成功,移除對應的事務日誌
通過實現事務日誌儲存器介面( TransactionLogStorage ),提供儲存功能。有兩個實現類: 
1. RdbTransactionLogStorage:關係型資料庫儲存柔性事務日誌; 
2. MemoryTransactionLogStorage:記憶體儲存柔性事務日誌;

3.1 TransactionLogStorage 

public interface TransactionLogStorage {
    
    /**
     * Save transaction log.
     * 
     * @param transactionLog transaction log
     */
    void add(TransactionLog transactionLog);
    
    /**
     * Remove transaction log.
     * 
     * @param id transaction log id
     */
    void remove(String id);
    
    /**
     * Find eligible transaction logs.
     * 
     * <p>To be processed transaction logs: </p>
     * <p>1. retry times less than max retry times.</p>
     * <p>2. transaction log last retry timestamp interval early than last retry timestamp.</p>
     * 
     * @param size size of fetch transaction log
     * @param maxDeliveryTryTimes max delivery try times
     * @param maxDeliveryTryDelayMillis max delivery try delay millis
     * @return eligible transaction logs
     */
    List<TransactionLog> findEligibleTransactionLogs(int size, int maxDeliveryTryTimes, long maxDeliveryTryDelayMillis);
    
    /**
     * Increase asynchronized delivery try times.
     * 
     * @param id transaction log id
     */
    void increaseAsyncDeliveryTryTimes(String id);
    
    /**
     * Process transaction logs.
     *
     * @param connection connection for business app
     * @param transactionLog transaction log
     * @param maxDeliveryTryTimes max delivery try times
     * @return process success or not
     */
    boolean processData(Connection connection, TransactionLog transactionLog, int maxDeliveryTryTimes);
}

註釋的比較清晰了,翻一下:

TransactionLogStorage中幾個重要介面在兩個實現類中的實現: 
* void add(TransactionLog):Rdb實現就是把事務日誌TransactionLog 插入到transaction_log表中,Memory實現就是把事務日誌儲存到ConcurrentHashMap中; 
* void remove(String id):Rdb實現就是從transaction_log表中刪除事務日誌,Memory實現從ConcurrentHashMap中刪除事務日誌; 
* void increaseAsyncDeliveryTryTimes(String id):非同步增加送達重試次數,即TransactionLog中的asyncDeliveryTryTimes+1;Rdb實現就是update transaction_log表中async_delivery_try_times欄位加1;Memory實現就是TransactionLog中重新給asyncDeliveryTryTimes賦值new AtomicInteger(transactionLog.getAsyncDeliveryTryTimes()).incrementAndGet(); 
* findEligibleTransactionLogs(): 查詢需要處理的事務日誌,條件是:①非同步處理次數async_delivery_try_times小於引數最大處裡次數maxDeliveryTryTimes,②transaction_type是BestEffortsDelivery,③系統當前時間與事務日誌的建立時間差要超過引數maxDeliveryTryDelayMillis,每次最多查詢引數size條;Rdb實現通過sql從transaction_log表中查詢,Memory實現遍歷ConcurrentHashMap匹配符合條件的TransactionLog; 
* boolean processData():Rdb實現執行TransactionLog中的sql,如果執行過程中丟擲異常,那麼呼叫increaseAsyncDeliveryTryTimes()增加送達重試次數並丟擲異常,如果執行成功,刪除事務日誌,並返回true;Memory實現直接返回false(因為processData()的目的是執行TransactionLog中的sql,而Memory型別無法觸及資料庫,所以返回false)

3.2 RdbTransactionLogStorage 介面實現原始碼

public final class RdbTransactionLogStorage implements TransactionLogStorage {
    
    private final DataSource dataSource;
    
    @Override
    public void add(final TransactionLog transactionLog) {
        String sql = "INSERT INTO `transaction_log` (`id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`) VALUES (?, ?, ?, ?, ?, ?);";
        try (...
    }
        
    public void remove(final String id) {
      String sql = "DELETE FROM `transaction_log` WHERE `id`=?;";
      ...
    }
    public List<TransactionLog> findEligibleTransactionLogs(final int size, final int maxDeliveryTryTimes, final long maxDeliveryTryDelayMillis) {
    //最多去size條
    List<TransactionLog> result = new ArrayList<>(size);
    String sql = "SELECT `id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`, `async_delivery_try_times` "
        + "FROM `transaction_log` WHERE `async_delivery_try_times`<? AND `transaction_type`=? AND `creation_time`<? LIMIT ?;";
    try (Connection conn = dataSource.getConnection()) {  
    。。。。
    }
    public void increaseAsyncDeliveryTryTimes(final String id) {
    // 更新處理次數+1
    String sql = "UPDATE `transaction_log` SET `async_delivery_try_times`=`async_delivery_try_times`+1 WHERE `id`=?;";
    ...
    }
    public boolean processData(final Connection connection, final TransactionLog transactionLog, final int maxDeliveryTryTimes) {
        try (
            Connection conn = connection;
            // 重試執行TransactionLog中的sql
            PreparedStatement preparedStatement = conn.prepareStatement(transactionLog.getSql())) {
            for (int parameterIndex = 0; parameterIndex < transactionLog.getParameters().size(); parameterIndex++) {
                preparedStatement.setObject(parameterIndex + 1, transactionLog.getParameters().get(parameterIndex));
            }
            preparedStatement.executeUpdate();
        } catch (final SQLException ex) {
            //如果丟擲異常,表示執行sql失敗,那麼把增加處理次數並把異常丟擲去;
            increaseAsyncDeliveryTryTimes(transactionLog.getId());
            throw new TransactionCompensationException(ex);
        }
        // 如果沒有丟擲異常,表示執行sql成功,那麼刪除該事務日誌;
        remove(transactionLog.getId());
        return true;
    }
  • 該方法會被最大努力送達型非同步作業呼叫到

TransactionLog (transaction_log) 資料庫表結構如下:

欄位名字資料庫型別備註
id事件編號VARCHAR(40)EventBus 事件編號,非事務編號
transaction_type柔性事務型別VARCHAR(30)
data_source真實資料來源名VARCHAR(255)
sql執行 SQLTEXT已經改寫過的 SQL
parameters佔位符引數TEXTJSON 字串儲存
creation_time記錄時間LONG
async_delivery_try_times已非同步重試次數INT

4. 最大努力送達型事務監聽器

最大努力送達型事務監聽器,BestEffortsDeliveryListener,負責記錄事務日誌、同步重試執行失敗 SQL。

public final class BestEffortsDeliveryListener {
    
    @Subscribe
    @AllowConcurrentEvents
    //從方法可知,只監聽DML執行事件
    public void listen(final DMLExecutionEvent event) {
        //判斷是否需要繼續,判斷邏輯為:事務存在,並且是BestEffortsDelivery型別事務
        if (!isProcessContinuously()) {
            return;
        }
        // 從柔性事務管理器中得到柔性事務配置
        SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get();
        //得到配置的柔性事務儲存器
        TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());
        //得到最大努力送達型事務
        BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();
          // 根據事件型別做不同處理
        switch (event.getEventExecutionType()) {
              // 如果執行前事件,那麼先儲存事務日誌;
            case BEFORE_EXECUTE:
                //TODO for batch SQL need split to 2-level records
                transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(), 
                        event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));
                return;
                 // 如果執行成功事件,那麼刪除事務日誌;
            case EXECUTE_SUCCESS: 
                transactionLogStorage.remove(event.getId());
                return;
                //執行失敗,同步重試
            case EXECUTE_FAILURE: 
                boolean deliverySuccess = false;
                for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) {
                    //如果執行成功,那麼返回,不需要再嘗試
                    if (deliverySuccess) {
                        return;
                    }
                    boolean isNewConnection = false;
                    Connection conn = null;
                    PreparedStatement preparedStatement = null;
                    try {
                         // 獲得資料庫連線
                        conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);
                        // 通過執行"select 1"判斷conn是否是有效的資料庫連線;如果不是有效的資料庫連線,釋放掉並重新獲取一個數據庫連線;
                        // 為啥呢?因為可能執行失敗是資料庫連線異常,所以再判斷一次
                        if (!isValidConnection(conn)) {
                            bedSoftTransaction.getConnection().release(conn);
                            conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);
                            isNewConnection = true;
                        }
                        preparedStatement = conn.prepareStatement(event.getSql());
                        //同步重試,
                        //TODO for batch event need split to 2-level records(對於批量事件需要解析成兩層列表)
                        for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) {
                            preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex));
                        }
                        //因為只監控DML,所以呼叫executeUpdate()
                        preparedStatement.executeUpdate();
                        deliverySuccess = true;
                        //執行成功;根據id刪除事務日誌;
                        transactionLogStorage.remove(event.getId());
                    } catch (final SQLException ex) {
                        // 如果sql執行有異常,那麼輸出error日誌
                        log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);
                    } finally {
                    // 關閉連結
                        close(isNewConnection, conn, preparedStatement);
                    }
                }
                return;
            default: 
               // 預設對於支援三種事件型別之外的,丟擲異常
                throw new UnsupportedOperationException(event.getEventExecutionType().toString());
        }
    }
    private boolean isProcessContinuously() {
        return SoftTransactionManager.getCurrentTransaction().isPresent()
                && SoftTransactionType.BestEffortsDelivery == SoftTransactionManager.getCurrentTransaction().get().getTransactionType();
    }

BestEffortsDeliveryListener 通過 EventBus 實現監聽 SQL 的執行。Sharding-JDBC 而是通過google-guava的EventBus釋出事件。具體可以結合《SQL執行》來看。
QL 執行前,插入事務日誌
SQL 執行成功,移除事務日誌

SQL 執行失敗,根據柔性事務配置( SoftTransactionConfiguration )同步的事務送達的最大嘗試次數( syncMaxDeliveryTryTimes )進行多次重試直到成功。

最後進行關閉連結close()

private void close(final boolean isNewConnection, final Connection conn, final PreparedStatement preparedStatement) {
        if (null != preparedStatement) {
            try {
                preparedStatement.close();
            } catch (final SQLException ex) {
                log.error("PreparedStatement closed error:", ex);
            }
        }
        if (isNewConnection && null != conn) {
            try {
                conn.close();
            } catch (final SQLException ex) {
                log.error("Connection closed error:", ex);
            }
        }
    }

5 最大努力送達型非同步作業

   當最大努力送達型事務監聽器( BestEffortsDeliveryListener )多次同步重試失敗後,交給最大努力送達型非同步作業進行多次非同步重試,並且多次執行有固定間隔。
Sharding-JDBC 提供了兩個最大努力送達型非同步作業實現:
NestedBestEffortsDeliveryJob :內嵌的最大努力送達型非同步作業
BestEffortsDeliveryJob :最大努力送達型非同步作業

邏輯類似,只是前者無法實現高可用,可以在測試環境用。

5.1 BestEffortsDeliveryJob 

核心原始碼在模組sharding-jdbc-transaction-async-job中。該模組是一個獨立非同步處理模組,使用者決定是否需要啟用,原始碼比較少。寶結構如下圖所示:


Main方法的核心原始碼如下:

public static void main(final String[] args) throws Exception {
    // CHECKSTYLE:ON
        try (InputStreamReader inputStreamReader = new InputStreamReader(BestEffortsDeliveryJobMain.class.getResourceAsStream("/conf/config.yaml"), "UTF-8")) {
            BestEffortsDeliveryConfiguration config = new Yaml(new Constructor(BestEffortsDeliveryConfiguration.class)).loadAs(inputStreamReader, BestEffortsDeliveryConfiguration.class);
            new BestEffortsDeliveryJobFactory(config).init();
        }
    }

由原始碼可知,主配置檔案是config.yaml;將該檔案解析為BestEffortsDeliveryConfiguration,然後呼叫job工廠的配置初始化

config.yaml配置檔案

#事務日誌的資料來源.
targetDataSource:
  ds_0: !!org.apache.commons.dbcp.BasicDataSource
    driverClassName: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/ds_0
    username: root
    password:
  ds_1: !!org.apache.commons.dbcp.BasicDataSource
    driverClassName: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/ds_1
    username: root
    password:

#事務日誌的資料來源.
transactionLogDataSource:
  ds_trans: !!org.apache.commons.dbcp.BasicDataSource
    driverClassName: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/trans_log
    username: root
    password:

zkConfig:
  #註冊中心的連線地址
  connectionString: localhost:2181
  
  #作業的名稱空間
  namespace: Best-Efforts-Delivery-Job
  
  #註冊中心的等待重試的間隔時間的初始值
  baseSleepTimeMilliseconds: 1000
  
  #註冊中心的等待重試的間隔時間的最大值
  maxSleepTimeMilliseconds: 3000
  
  #註冊中心的最大重試次數
  maxRetries: 3

jobConfig:
  #作業名稱
  name: bestEffortsDeliveryJob
  
  #觸發作業的cron表示式
  cron: 0/5 * * * * ?
  
  #每次作業獲取的事務日誌最大數量
  transactionLogFetchDataCount: 100
  
  #事務送達的最大嘗試次數.
  maxDeliveryTryTimes: 3
  
  #執行送達事務的延遲毫秒數,早於此間隔時間的入庫事務才會被作業執行
  maxDeliveryTryDelayMillis: 60000

BestEffortsDeliveryJobFactory核心原始碼:

public final class BestEffortsDeliveryJobFactory {
    //這個屬性賦值通過有參構造方法進行賦值,是通過`config.yaml`配置的屬性
    private final BestEffortsDeliveryConfiguration bedConfig;
    
    /**
     * Main中呼叫該init()方法,
     * Initialize best efforts delivery job.
     */
    public void init() {
        //根據config.yaml中配置的zkConfig節點,得到協調排程中心CoordinatorRegistryCenter
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(createZookeeperConfiguration(bedConfig));
        // 排程中心初始化
        regCenter.init();
        //構造elastic-job排程任務
        JobScheduler jobScheduler = new JobScheduler(regCenter, createBedJobConfiguration(bedConfig));
        jobScheduler.setField("bedConfig", bedConfig);
        jobScheduler.setField("transactionLogStorage", TransactionLogStorageFactory.createTransactionLogStorage(new RdbTransactionLogDataSource(bedConfig.getDefaultTransactionLogDataSource())));
        jobScheduler.init();
    }
    
    public ZookeeperConfiguration createZookeeperConfiguration(final BestEffortsDeliveryConfiguration bedConfig) {
        AsyncSoftTransactionZookeeperConfiguration zkConfig = bedConfig.getZkConfig();
        return new ZookeeperConfiguration(zkConfig.getConnectionString(), zkConfig.getNamespace(), zkConfig.getBaseSleepTimeMilliseconds(),
            zkConfig.getMaxSleepTimeMilliseconds(), zkConfig.getMaxRetries());
    }
    //建立BestEffortsDeliveryJob配置
    private JobConfiguration createBedJobConfiguration(final BestEffortsDeliveryConfiguration bedJobConfig) {
        // 根據config.yaml中配置的jobConfig節點得到job配置資訊,且指定job型別為BestEffortsDeliveryJob
        JobConfiguration result = new JobConfiguration(bedJobConfig.getJobConfig().getName(), BestEffortsDeliveryJob.class, 1, bedJobConfig.getJobConfig().getCron());
        result.setFetchDataCount(bedJobConfig.getJobConfig().getTransactionLogFetchDataCount());
        result.setOverwrite(true);
        return result;
    }
}

JOB的程式碼

public class BestEffortsDeliveryJob extends AbstractIndividualThroughputDataFlowElasticJob<TransactionLog> {
    
    @Setter
    private BestEffortsDeliveryConfiguration bedConfig;
    
    @Setter
    private TransactionLogStorage transactionLogStorage;
    
    @Override
    public List<TransactionLog> fetchData(final JobExecutionMultipleShardingContext context) {
        return transactionLogStorage.findEligibleTransactionLogs(context.getFetchDataCount(), 
            bedConfig.getJobConfig().getMaxDeliveryTryTimes(), bedConfig.getJobConfig().getMaxDeliveryTryDelayMillis());
    }
    
    @Override
    public boolean processData(final JobExecutionMultipleShardingContext context, final TransactionLog data) {
        try (
            Connection conn = bedConfig.getTargetDataSource(data.getDataSource()).getConnection()) {
            transactionLogStorage.processData(conn, data, bedConfig.getJobConfig().getMaxDeliveryTryTimes());
        } catch (final SQLException | TransactionCompensationException ex) {
            log.error(String.format("Async delivery times %s error, max try times is %s, exception is %s", data.getAsyncDeliveryTryTimes() + 1, 
                bedConfig.getJobConfig().getMaxDeliveryTryTimes(), ex.getMessage()));
            return false;
        }
        return true;
    }
    
    @Override
    public boolean isStreamingProcess() {
        return false;
    }
}
呼叫 #fetchData() 方法獲取需要處理的事務日誌 (TransactionLog),內部呼叫了 TransactionLogStorage#findEligibleTransactionLogs() 方法
呼叫 #processData() 方法處理事務日誌,重試執行失敗的 SQL,內部呼叫了 TransactionLogStorage#processData()
#fetchData() 和 #processData() 呼叫是 Elastic-Job 控制的。每一輪定時排程,每條事務日誌只執行一次。當超過最大非同步呼叫次數後,該條事務日誌不再處理,所以生產使用時,最好增加下相應監控超過最大非同步重試次數的事務日誌。

參考:http://www.iocoder.cn/Sharding-JDBC/transaction-bed/

https://www.jianshu.com/p/0f1a938c9017

相關推薦

shardingjdbc ()-努力事務

一 序:Sharding-JDBC由於效能方面的考量,決定不支援強一致性分散式事務。目前支援的:Best efforts delivery transaction (已經實現).Try confirm cancel transaction (待定).最大努力送達型:在分散式資

分散式事務的典型處理方式:2PC、TCC、非同步確保和努力

1. 柔性事務和剛性事務 柔性事務滿足BASE理論(基本可用,最終一致) 剛性事務滿足ACID理論 本文主要圍繞分散式事務當中的柔性事務的處理方式進行討論。 柔性事務分為 兩階段型補償型非同步確保型最大努力通知型幾種。 由於支付寶整個架構是SOA架構,因此

分散式事務解決方案(四)【努力通知】

4. 最大努力通知方案(定期校對) 4.1 介紹 實現 業務活動的主動方,在完成業務活動處理後,向業務活動被動方傳送訊息,允許訊息丟失 業務活動的被動方根據定時策略,向業務活動的主動方查詢,恢復丟失的業務訊息 約束:被動方的業務處理結果不影響主動

微服務開發的痛點-分散式事務SEATA入門簡介

![](https://james-1258744956.cos.ap-shanghai.myqcloud.com/%E5%BE%AE%E6%9C%8D%E5%8A%A1%E6%9C%80%E5%A4%A7%E7%9A%84%E5%BC%80%E5%8F%91%E7%97%9B%E7%82%B9-%E5%88

MySQL TEXT數據類長度

www hang 內容 就會 最大 imu have chang 類型 TINYTEXT 256 bytes TEXT 65,535 bytes ~64kb MEDIUMTEXT 16,777,215 bytes ~16MB

RQNOJ 311 [NOIP2000]乘積:劃分dp

main print sed 編號 開始 ios std [0 否則 題目鏈接:https://www.rqnoj.cn/problem/311 題意:   給你一個長度為n的數字,用t個乘號分開,問你分開後乘積最大為多少。(6<=n<=40,1<=k&l

求一個類值和小值

最大 limit std min fin ons con n) define #include <stdio.h> #include <limits.h> #define MYMAX(T) \ (- ( ( 1<< ( s

[LeetCode] Largest Plus Sign 的加符號

marked down malle ogr them mit min one demo In a 2D grid from (0, 0) to (N-1, N-1), every cell contains a 1, except those cells in

springboot~為Money類添加值和小值的註解校驗

status ret 大於 public http money string uil lin 在spring框架裏,為我們集成了很多校驗註解,直接在字段上添加對應的註解即可,這些註解基本都是簡單保留類型的,即int,long,float,double,String等,而如果

Checked exceptions: Java’s biggest mistake-檢查異常:Java的錯誤(翻譯)

lsb ++ 好的 stream abstract throw features inter 不用 原文地址:http://literatejava.com/exceptions/checked-exceptions-javas-biggest-mistake/ 僅供參考,

為什麼Java中的float值大於long

文章目錄 問題:float是32位,long是64位,為什麼float表示的範圍比long大呢? 參考 問題:float是32位,long是64位,為什麼float表示的範圍比long大呢? 原因: 原因是,float與lo

C++之bool型別,名稱空間的練習——使用一個函式找出一個整陣列中的值或小值

#include<iostream> using namespace std; int findMaxOrMin(int * n,int number) { int temp=n[0]; bool isMax; cin>>isMax; for(int i=1;

將整陣列轉化為一個字串

給定一個任意長度的整型陣列,求陣列內能組合出來的最大字串(整數)? 例如陣列:{ 323, 32, 1, 9569, 4, 6, 92, 636, 63, 998 } 將其新增到List排序後

C語言的整溢位問題 int、long、long long取值範圍 小值

《C和指標》中寫過:long與int:標準只規定long不小於int的長度,int不小於short的長度。 double與int型別的儲存機制不同,long int的8個位元組全部都是資料位,而double是以尾數,底數,指數的形式表示的,類似科學計數法,因此double比i

常用整變數的取值範圍

#include<iostream> #include<math.h> #include<limits.h> using namespace std; int main() { // 2^16 = 32768 : unsigned sh

程式設計師面試100題之 求子陣列的

                        題目:輸入一個整形陣列,數組裡有正數也有負數。陣列中連續的一個或多個整陣列成一個子陣列,每個子陣列都有一個和。求所有子陣列的和的最大值。要求時間複雜度為O(n)。       例如輸入的陣列為1, -2, 3, 10, -4, 7, 2, -5,和最大的子陣列為

MySQL復雜查詢:連接查詢+取某個類

img 記得 有一個 排名 教育 tip ogl 左連接 就是 本文鏈接:https://blog.inchm.cn/default/38.html 需求 假設有一個考試,比如CET(包括CET-4和CET-6),學生可以多次報考刷分。現在某教育單位要從考試結果中把每個

程式設計師程式設計藝術-----第二十八 ~ 二十章-----連續乘積子串、字串編輯距離

               第二十八~二十九章:最大連續乘積子串、字串編輯距離前言    時間轉瞬即逝,一轉眼,又有4個多月沒來更新blog了,過去4個月都在幹啥呢?對的,今2013年元旦和朋友利用業餘時間一起搭了個方便朋友們找工作的程式設計面試演算法論壇:為學論壇http://www.51weixue.c

找出兩個int變數的值和小值,不使用if/:?/switch判斷語句

方法一: Max=(a+b+|a-b|)/2; Max=(a+b-|a-b|)/2; 方法二:通過加減運算和移位運算相結合 Min = a+(((b-a)>>31)&(b-a)); Max = a-(((a-b)>>31)&(a-b

機器學習筆記(十)——熵原理和模型定義

一、最大熵原理     最大熵原理是概率模型學習的一個準則。最大熵原理認為,在學習概率模型時,在所有可能的概率分佈中,熵最大的模型是最好的模型。通常用約束條件來確定概率模型的集合,所以,最大熵模型也可以表述為在滿足約束條件的模型集合中選取熵最大的模型。