1. 程式人生 > >消費者實現應用內分布式事務

消費者實現應用內分布式事務

必須 mount 過程 copy clust 下載 加載 cond time

通過《消費者實現應用內分布式事務》、《生產者實現應用內分布式事務管理》、《實現持久訂閱消費者》三個章節的實踐,其實我們已經可以通過消息隊列實現多應用的分布式事務,應用內的事務保證了消息不會被重復生產消費、持久化訂閱保證了消息一定會被消費(進入死信隊列特殊處理),但其對於業務來說耦合性還是太強,在進行業務處理的同時消息處理名,其采用的仍然是應用內的事務處理,並不適合在大型高性能高並發系統實踐,那麽本章將通過本地事務+消息隊列+外部事件定義表+定時任務實現解耦。
(目前主要實現微服務下最終一致性事務的方案主要是:可靠事件;補償模式;TCC(Try-Confirm-Cancel);三種,本案例為可靠事件方式)

場景描述:
本場景就拿最常用的轉賬業務闡述:
在工行ICBC有賬號Card001,其中存於500元;
在中行BOC有賬號Card002,其中也存有500元;
此時從Card001賬號轉賬至Card002賬號300元。

系統設計:
工行系統ICBCPro,該工程主要實現兩個功能(實現轉出金額生成轉賬事件;定時任務發出轉賬事件至消息隊列),主要參考《生產者實現應用內分布式事務管理》實現;
中行系統BOCPro,該工程主要實現兩個功能(從消息隊列下載轉賬事件;定時任務對轉賬事件處理並更新轉入賬號金額),主要參考《消費者實現應用內分布式事務》實現;
此場景僅需要通過P2P消息模式即可。

構建ICBCPro工程
A、實現轉出金額生成轉賬事件
1、構建數據庫相關表以及基礎數據:
轉出賬號數據

轉出事件記錄

消息隊列

消息控制臺

2、執行單元測試代碼實現轉賬,此時賬戶扣除與轉賬事件記錄均在本地事務內:
//ICBC中賬戶card001轉出300元

[java] view plain copy
@Test
public void tranfer(){
EventLog eventLog = new EventLog();
eventLog.setAmount(new BigDecimal(300));
eventLog.setFromcard("card001");
eventLog.setTocard("card002");
eventLog.setEventstate(EventState.NEW);
eventLog.setTransferDate(new Date());
eventLogService.transfer(eventLog,new BigDecimal(300));
}

賬戶信息:

事件記錄


B、定時任務發出轉賬事件至消息隊列
對於事件記錄表,我們可以定義一個定時任務,將所有的NEW狀態事件全部發出,此時需要保證消息的可靠性,采用XA事務實現,但已經不影響我們業務的響應了,實現解耦、快速響應,下面貼出核心實現代碼:
1、首選實現數據排它鎖場景下的查詢與更新:
[java] view plain copy
/**
* 在排它鎖場景下數據更新,保證數據的可靠性
*/
@Override
public void updateEventstateById(String id, EventState eventState) {
EventLog eventLog=findEventLog4Update(id);
eventLog.setEventstate(eventState);
emJ1.merge(eventLog);
}

/**
* 實現排它鎖查詢
*/
@Override
public EventLog findEventLog4Update(String id){
EventLog eventLog=emJ1.find(EventLog.class, id, LockModeType.PESSIMISTIC_WRITE);
return eventLog;
}

2、在service定義查詢所有NEW狀態的事件、並采用XA事務管理NEW狀態事件的發送與更新(為了驗證了事務生效,設定了一個fromcard為空的數據觸發異常),在異常情況下我們也需要保證countDownLatch執行,避免線程阻塞:
[java] view plain copy
@Service
public class EventLogService www.shashuiyule.cn{
@Autowired
private EventLogRepository eventLogRepository;
@Resource(name="jmsQueueMessagingTemplate")
private JmsMessagingTemplate jmsQueueMessagingTemplate;
@Autowired
@Qualifier("icbc2boc")
private Queue icbc2boc;
....

/**
* 根據eventstate獲取EventLog數據集
* @param eventstate
* @return
*/
@Transactional(transactionManager="transactionManager1",propagation=Propagation.SUPPORTS,readOnly=true)
public List<EventLog> findByEventState(EventState eventstate){
return eventLogRepository.findByEventstate(eventstate);
}

/**
* XA事務
* @param id
* @param eventstate
*/
@Transactional(transactionManager="transactionManagerJTA",propagation=Propagation.REQUIRES_NEW)
public void transferToMQ(EventLog eventLog,EventState eventstate,CountDownLatch countDownLatch){
try {
System.out.println(Thread.currentThread().getName()+"本次處理數據:"+eventLog.getFromcard(www.ctxpty.com)+"、"+eventLog.getEventstate());
//再次數據庫查詢判斷,此時用到排它鎖--在兩個定時任務連續執行,一旦出現程序提交事務命令至數據庫,
//但數據庫還未執行,此時我們全表查詢的結果中當前數據行仍為修改前數據,故會造成重復消費
eventLog=eventLogRepository.findEventLog4Update(eventLog.getId());
if(EventState.Publish.equals(eventLog.getEventstate())){
System.out.println(Thread.currentThread().getName()+"數據:"+eventLog.getFromcard()+"無需處理");
return;
}
//payload
jmsQueueMessagingTemplate.convertAndSend(icbc2boc,eventLog);
eventLogRepository.updateEventstateById(eventLog.getId(), eventstate);
//構造異常場景驗證XA事務
if(eventLog.getFromcard()==null){
System.out.println(Thread.currentThread().getName()+"數據異常,不處理");
System.out.println(1/0);
}else{
System.out.println(Thread.currentThread().getName()+":"+eventLog.getFromcard()+"數據處理成功");
}
} finally {
countDownLatch.countDown();
}
}
}
3、定義Job,實現轉出任務,並通過線程池異步處理待處理事件集合,通過並發提高處理性能,通過countDownLatch保證了每個任務所有線程處理完成後啟動下一次任務;
[html] view plain copy
/**
* 轉出任務
* @author song
*/
@PersistJobDataAfterExecution
@DisallowConcurrentExecution //保證每次任務執行完畢,設置為串行執行
public class TransferJob extends QuartzJobBean {
private Logger logger=LoggerFactory.getLogger(TransferJob.class);
@Autowired
@Qualifier("quartzThreadPool")
private ThreadPoolTaskExecutor quartzThreadPool;
@Autowired
private EventLogService eventLogService;

@Override
protected void executeInternal(ww.hjshidpt.com JobExecutionContext jobExecutionContext) throws JobExecutionException {
logger.info("本次批處理開始");
//獲取所有未發送狀態的Event
List<EventLog> list=eventLogService.findByEventState(EventState.NEW);
//
final CountDownLatch countDownLatch=new CountDownLatch(list.size());

//遍歷發送
for(final EventLog eventLog:list){
//通過線程池提交任務執行,大大提高處理集合效率
quartzThreadPool.submit(new Runnable() {

@Override
public void run() {
eventLogService.transferToMQ(eventLog,EventState.Publish,countDownLatch);
}
});
}

//保證所有線程執行完成後退出
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("本次批處理完成");
}
}
4、定義轉出任務、觸發器、調度器以及處理線程池:
[html] view plain copy
@Bean(name="tranferJob")
public JobDetailFactoryBean tranferJob(www.dashuju178.com){
JobDetailFactoryBean factoryBean=new JobDetailFactoryBean();
//定義任務類
factoryBean.setJobClass(TransferJob.class);
//表示任務完成之後是否依然保留到數據庫,默認false
factoryBean.setDurability(true);
//為Ture時當Quartz服務被中止後,再次啟動或集群中其他機器接手任務時會嘗試恢復執行之前未完成的所有任務,默認false
factoryBean.setRequestsRecovery(true);
return factoryBean;
}

/**
* 註冊job1的觸發器
* @return
*/
@Bean(name="transferJobTrigger")
public CronTriggerFactoryBean transferJobTrigger(){
//觸發器
CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
cronTriggerFactoryBean.setCronExpression("*/5 * * * * ?");
cronTriggerFactoryBean.setJobDetail(tranferJob().getObject());
//調度工廠實例化後,經過5秒開始執行調度
cronTriggerFactoryBean.setStartDelay(30000);
cronTriggerFactoryBean.setGroup("tranfer");
cronTriggerFactoryBean.setName("tranfer");
return cronTriggerFactoryBean;
}

/**
* 調度工廠,加載觸發器,並設置自動啟動、啟動時延
* @return
*/
@Bean(name="transferSchedulerFactoryBean")
public SchedulerFactoryBean transferSchedulerFactoryBean(){
//調度工廠
SchedulerFactoryBean schedulerFactoryBean= new SchedulerFactoryBean();
schedulerFactoryBean.setConfigLocation(new ClassPathResource("quartz.properties"));
schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContextKey");
//集群Cluster下設置dataSource
// schedulerFactoryBean.setDataSource(dataSource);
//QuartzScheduler啟動時更新己存在的Job,不用每次修改targetObject後刪除qrtz_job_details表對應記錄了
schedulerFactoryBean.setOverwriteExistingJobs(true);
//QuartzScheduler延時啟動20S,應用啟動完後 QuartzScheduler 再啟動
schedulerFactoryBean.setStartupDelay(20);
//自動啟動
schedulerFactoryBean.setAutoStartup(true);
schedulerFactoryBean.setTriggers(transferJobTrigger().getObject());
//自定義的JobFactory解決job中service的bean註入
schedulerFactoryBean.setJobFactory(jobFactory);
return schedulerFactoryBean;
}

/**
* 用於處理待轉賬數據發至消息隊列的線程池
* @return
*/
@Bean(name="quartzThreadPool")
public ThreadPoolTaskExecutor getThreadPoolTaskExecutor(){
ThreadPoolTaskExecutor pool=new ThreadPoolTaskExecutor();
pool.setCorePoolSize(10);
pool.setQueueCapacity(100);
pool.setMaxPoolSize(10);
pool.setKeepAliveSeconds(10);
//避免應用關閉,任務沒有執行完成,起到shutdownhook鉤子的作用
pool.setWaitForTasksToCompleteOnShutdown(true);
//空閑時核心線程也不退出
pool.setAllowCoreThreadTimeOut(false);
//設置拒絕策略,不可執行的任務將被拋棄
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
return pool;
}

小結
特別註意:
1、周期時間剛好兩個定時任務連續執行,出現java程序提交事務緊接第二個任務啟動,但數據庫未完成命令,此時後續任務已經查詢數據,全表過濾能夠再次獲取未提交數據行原始數據,會造成二次消費,故需要對其采用排它鎖方式,二次查詢判斷後決定是否消費,從而規避二次消費問題;
2、註意在Service中不能隨便catch異常,避免分布式事務未回滾,造成重復消費;
3、通過CountDownLatch,實現任務線程等待所有的子任務線程執行完畢後方可退出本次任務,執行下一個任務,故其一定要在finally中實現countdown,避免造成任務線程阻塞;
4、需要設置OpenEntityManagerInViewInterceptor攔截器,避免提示session過早關閉問題;
5、數據庫DataSource必須定義好destroyMethod,避免程序關閉,事務還未提交的情況下出現連接池已經關閉;
6、設置好連接池需要等待已提交任務完成後方可shutdown;

優化空間:
1、根據數據特征進行任務分割,比如自增ID場景下,根據0、1、2等最後一位尾數分割不同的定時任務,配置任務集群,從而實現分布式高可用集群處理;
2、在數據查詢處理過程中,優化sql,提高單次查詢性能;
3、添加獨立的定時任務,將Publish已消費數據轉儲,減輕單表壓力;
4、目前已經加入線程池異步處理數據集合,提高單次任務執行效率;
5、一旦數據庫壓力比較大的情況下,也可以將Event分庫操作,減輕服務器數據庫連接、IO壓力;
6、采用微服務模式,將兩個功能實現服務分離;
7、也可以在定時任務中添加比如50MS的sleep時長,保證數據庫服務器端事務提交成功,取消排它鎖將進一步提高性能較小數據庫死鎖問題;

遺留問題:
1、在開發環境下,手動關閉程序MQ連接會過早關閉,修改數據後事務未提交,出現MySQL數據庫行已經被執行排他鎖;

構建BOCPro工程
A、從消息隊列下載轉賬事件
1、構建數據庫BOC數據庫相關表以及基礎數據:

事件表暫時為空

消息隊列,有一條轉賬數據


2、配置隊列消息模板:
[html] view plain copy
@Configuration
public class JmsMessageConfiguration {
@Autowired
@Qualifier(value="jmsQueueTemplate")
private JmsTemplate jmsQueueTemplate;

/**
* 定義點對點隊列
* @return
*/
@Bean(name="icbc2boc")
public Queue queue() {
return new ActiveMQQueue("icbc2boc");
}

/**
* 創建處理隊列消息模板
* @return
*/
@Bean(name="jmsQueueMessagingTemplate")
public JmsMessagingTemplate jmsQueueMessagingTemplate() {
JmsMessagingTemplate jmsMessagingTemplate =new JmsMessagingTemplate(jmsQueueTemplate);
//通過MappingJackson2MessageConverter實現Object轉換
jmsMessagingTemplate.setMessageConverter(new MappingJackson2MessageConverter());
return new JmsMessagingTemplate(jmsQueueTemplate);
}

}
3、配置監聽器,監聽轉賬事件消息:
[html] view plain copy
@Component
public class TraferIn {

@Autowired
@Qualifier("icbc2boc")
private Queue queue;
@Autowired
@Qualifier("jmsQueueMessagingTemplate")
private JmsMessagingTemplate jmsQueueMessagingTemplate;
@Autowired
private EventLogService eventLogService;

/**
* 定義監聽轉賬事件監聽
* @param text
* @throws Exception
*/
@JmsListener(destination = "icbc2boc",containerFactory="jmsListenerContainerFactory4Queue")//ActiveMQ.DLQ
public void receiveQueue(EventLog eventLog) throws Exception {
System.out.println("接受到的事件數據:"+eventLog.toString());
eventLogService.mq2transfer(eventLog, new BigDecimal(300));
}
}

4、采用分布式事務管理下載的消息隊列事件,模擬事務失效,驗證成功:
[html] view plain copy
/**
* XA事務
* @param eventLog
* @param amount
*/
@Transactional(transactionManager="transactionManagerJTA",propagation=Propagation.REQUIRED)
public void mq2transfer(EventLog eventLog,BigDecimal amount){
//保存事件日誌
eventLogRepository.saveEvetLog(eventLog);
// System.out.println(1/0);
}
5、需要采用XA事務,故我們不能直接通過EventLogRepository保存數據,定義自定義保存方法:
[html] view plain copy
/**
* 采用分布式事務數據源保存事件
*/
@Override
public EventLog saveEvetLog(EventLog eventLog) {
return emJ1.merge(eventLog);
}
6、啟動程序監聽後,收到事件

數據庫添加了一條NEW狀態事件

消費後消息隊列被清空

B、定時任務對轉賬事件處理並更新轉入賬號金額

消費者實現應用內分布式事務