1. 程式人生 > >多執行緒異常 和 事務(二)

多執行緒異常 和 事務(二)

1.接著上一篇程式碼變形一下

首先我們在上一篇文章的基礎上把程式碼變形的面向物件一些

package com.wei.service.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

public class MultiThreadExceptionAndReturnValSHow {

	public static void main(String[] args) {
		final ThreadFactory threadFactory = new ThreadFactoryBuilder()
				.setNameFormat("Orders-%d-Thread:").build();

		int minPoolSize = 5;
		int maxPoolSize = 10;

		//SynchronousQueue同步佇列容量0,就是說執行緒池沒有等待佇列
		ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
				minPoolSize, maxPoolSize, 60L, TimeUnit.SECONDS,
				new SynchronousQueue<Runnable>(), threadFactory);

		//測試一個Runnable
		for (int i = 0; i < 5; i++) {
			ExceptionRunThread thread = new ExceptionRunThread();
			threadPoolExecutor.submit(thread);
		}
		//測試一個callable
		for (int i = 0; i < 5; i++) {
			ExceptionCallableThread call = new ExceptionCallableThread();
			threadPoolExecutor.submit(call);
		}
		//測試多個callable
		List<Callable<String>> callList = new ArrayList<Callable<String>>();
		for (int i = 0; i < 5; i++) {
			ExceptionCallable call = new ExceptionCallable();
			callList.add(call);
		}
		invokeAllTask(threadPoolExecutor, callList);
		//等待任務完成後,關閉執行緒池
		threadPoolExecutor.shutdown();
	}

	private static <T> List<T> invokeAllTask(
			ThreadPoolExecutor threadPoolExecutor, List<Callable<T>> callList) {
		List<Future<T>> futureAll = null;
		List<T> resultList = new ArrayList<T>();
		try {
			futureAll = threadPoolExecutor.invokeAll(callList);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		AtomicInteger in = new AtomicInteger(1);
		for (Future<T> f : futureAll) {
			try {
				T result = f.get();
				System.out.println("count:" + in.getAndIncrement() + ":"
						+ result);
				resultList.add(result);
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
		return resultList;
	}

}

class ExceptionRunThread implements Runnable {

	@Override
	public void run() {
		try {
			System.out.println(Thread.currentThread().getName()
					+ "do something start......");
			if (Thread.currentThread().getName().contains("2")) {
				throw new RuntimeException("測試 uncheck exception");
			}
			Thread.sleep(100);// do something
			System.out.println(Thread.currentThread().getName()
					+ "do something end......");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

class ExceptionCallableThread implements Callable<String> {

	@Override
	public String call() {
		try {
			System.out.println(Thread.currentThread().getName()
					+ "do something start......");
			if (Thread.currentThread().getName().contains("2")) {
				throw new RuntimeException("測試 uncheck exception");
			}
			Thread.sleep(100);// do something
			System.out.println(Thread.currentThread().getName()
					+ "do something end......");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return Thread.currentThread().getName();
	}
}


這裡面有有一個重要的地方,就是執行緒的命名,我們藉助google執行緒工廠的一個類,執行緒的命名很重要,大家可以把執行緒數量修改到20,
Run結果

Orders-0-Thread:do something start......
Orders-1-Thread:do something start......
Orders-2-Thread:do something start......
Orders-3-Thread:do something start......
Orders-4-Thread:do something start......
Orders-5-Thread:do something start......
Orders-6-Thread:do something start......
Orders-7-Thread:do something start......
Orders-8-Thread:do something start......
Orders-9-Thread:do something start......
Orders-10-Thread:do something start......
Orders-11-Thread:do something start......
Orders-12-Thread:do something start......
Orders-13-Thread:do something start......
Orders-14-Thread:do something start......
Orders-0-Thread:do something end......
Orders-1-Thread:do something end......
Orders-4-Thread:do something end......
Orders-3-Thread:do something end......
Orders-6-Thread:do something end......
Orders-5-Thread:do something end......
Orders-8-Thread:do something end......
Orders-7-Thread:do something end......
Orders-9-Thread:do something end......
Orders-14-Thread:do something end......
Orders-11-Thread:do something end......
Orders-10-Thread:do something end......
Orders-13-Thread:do something end......
count:1:Orders-10-Thread:
count:2:Orders-11-Thread:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 測試 uncheck exception
	at java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source)
	at java.util.concurrent.FutureTask.get(Unknown Source)
	at com.wei.service.impl.MultiThreadExceptionAndReturnValSHow.invokeAllTask(MultiThreadExceptionAndReturnValSHow.java:63)
	at com.wei.service.impl.MultiThreadExceptionAndReturnValSHow.main(MultiThreadExceptionAndReturnValSHow.java:46)
Caused by: java.lang.RuntimeException: 測試 uncheck exception
	at com.wei.service.impl.ExceptionCallable.call(MultiThreadExceptionAndReturnVal.java:138)
	at com.wei.service.impl.ExceptionCallable.call(MultiThreadExceptionAndReturnVal.java:1)
	at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
	at java.util.concurrent.FutureTask.run(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
count:3:Orders-13-Thread:
count:4:Orders-14-Thread:

輸出不難理解,執行緒2和12執行緒都沒有正常結束,而執行緒12正好執行的是有返回值的所以count:5:orders-12-Thread是異常的。

2下面我們整合到spring中去看,有了上面的程式碼變形,那麼整合到spring中會比較容易

先看執行緒池業務介面類
package com.wei.service;

import java.util.List;
import java.util.concurrent.Callable;

import com.wei.service.facade.vo.CommonResonse;

public interface MultiThreadTaskExecutorService {

	public <T> CommonResonse<T> invokeAll(List<Callable<T>> taskList);
}
實現類,我們可以看到執行緒執行器被包在了這個裡面
package com.wei.service.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

import com.alibaba.fastjson.JSON;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.wei.service.MultiThreadTaskExecutorService;
import com.wei.service.facade.vo.CommonResonse;


public class MultiThreadTaskExecutorServiceImpl implements
		MultiThreadTaskExecutorService, InitializingBean, DisposableBean {

	private Logger logger = LoggerFactory.getLogger(this.getClass());
	private ThreadPoolExecutor threadPoolExecutor;
	private int minPoolSize;
	private int maxPoolSize;

	@Override
	public <T> CommonResonse<T> invokeAll(List<Callable<T>> taskList) {
		try {
			List<Future<T>> futures = threadPoolExecutor.invokeAll(taskList);
			List<T> resultList = new ArrayList<T>();
			for (Future<T> future : futures) {
				try {
					resultList.add(future.get());
				} catch (ExecutionException e) {
					e.printStackTrace();
				}
			}
			logger.info("result={}",JSON.toJSONString(resultList));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return null;
	}

	@Override
	public void destroy() throws Exception {
		threadPoolExecutor.shutdown();
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		final ThreadFactory threadFactory = new ThreadFactoryBuilder()
        .setNameFormat("Orders-%d")
        .build();
		threadPoolExecutor = new ThreadPoolExecutor(minPoolSize, maxPoolSize,
				60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),threadFactory);
	}

	public int getMinPoolSize() {
		return minPoolSize;
	}

	public void setMinPoolSize(int minPoolSize) {
		this.minPoolSize = minPoolSize;
	}

	public int getMaxPoolSize() {
		return maxPoolSize;
	}

	public void setMaxPoolSize(int maxPoolSize) {
		this.maxPoolSize = maxPoolSize;
	}

}


xm裡bean的配置
	<bean id="multiThreadTaskExecutorService"
		class="com.wei.service.impl.MultiThreadTaskExecutorServiceImpl">
		<property name="minPoolSize" value="5" />
		<property name="maxPoolSize" value="10" />
	</bean>

aop事務的配置
<tx:advice id="txAdvice" transaction-manager="transactionManager">
		<tx:attributes>
			<tx:method name="*" propagation="REQUIRED" rollback-for="java.lang.Exception"/>
		</tx:attributes>
	</tx:advice>
	<aop:config>
		<aop:pointcut id="servicePiontCut"
			expression="execution(* com.wei.service..*(..))" />
		<aop:advisor advice-ref="txAdvice" pointcut-ref="servicePiontCut" />
	</aop:config>
在看業務介面類
package com.wei.service;

import java.util.List;
import java.util.concurrent.Callable;

import com.wei.dao.entity.User;


public interface CallTaskService extends Callable<List<User>>{

	List<User> call();
}
package com.wei.service;

import java.util.List;
import java.util.concurrent.Callable;

import com.wei.dao.entity.User;


public interface CallTask2Service extends Callable<List<User>>{

	List<User> call();
}


業務介面實現
package com.wei.service.impl;

import java.util.List;
import java.util.concurrent.Callable;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.wei.dao.entity.User;
import com.wei.dao.mapper.UserMapper;
import com.wei.service.CallTaskService;

@Service
public class CallTaskServiceImpl implements CallTaskService ,Callable<List<User>>{


	@Autowired
	UserMapper userMapper;
	
	@Override
	public List<User> call() {
		return userMapper.select(null);
	}
}

下面是依賴這幾個介面的測試類,介面沒有寫大家自己補一下
package com.wei.service.impl;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;

import org.apache.ibatis.session.RowBounds;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.alibaba.fastjson.JSON;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.wei.dao.entity.User;
import com.wei.dao.mapper.UserMapper;
import com.wei.service.CallTask2Service;
import com.wei.service.CallTaskService;
import com.wei.service.MultiThreadTaskExecutorService;
import com.wei.service.UserService;
import com.wei.service.vo.UserVo;

@Service
public class UserServiceImpl implements UserService{

	@Autowired
	UserMapper userMapper;
	@Autowired
	CallTask2Service callTask2Service;
	@Autowired
	CallTaskService callTaskService;
	@Autowired
	MultiThreadTaskExecutorService multiThreadTaskExecutorService;
	
	@Override
	public void testThreadRollBack(UserVo userVo) {
		List<Callable<List<User>>> taskList=new ArrayList<Callable<List<User>>>();
//		taskList.add(new Callable<List<User>>() {
//			@Override
//			public List<User> call() throws Exception {
//				System.out.println("---------->"+"select");
//				return userMapper.select(null);
//			}
//		});
//		taskList.add(new Callable<List<User>>() {
//			@Override
//			public List<User> call() throws Exception {
//				User user=new User();
//				user.setCreateDate(new Date());
//				SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//				user.setId("rollbank20160421-"+sdf.format(new Date()));
//				user.setName("rollback"+sdf.format(new Date()));
//				user.setPassword("2222222");
//				userMapper.insert(user);
//				System.out.println("---------->"+"insert and select");
//				userMapper.select(null);
//				int i=1/0;
//				return userMapper.select(null);
//			}
//		});
		taskList.add(callTask2Service);
		taskList.add(callTaskService);
		multiThreadTaskExecutorService.invokeAll(taskList);
//		User user=new User();
//		user.setCreateDate(new Date());
//		SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//		user.setId("rollbank20160421-"+sdf.format(new Date()));
//		user.setName("rollback"+sdf.format(new Date()));
//		user.setPassword("11111111");
//		userMapper.insert(user);
		int i=1/0;
	}

}

通過測試,結果我就不貼出來了,我們可以看到只要是spring託管的bean切面的事務就是起作用的,new的就沒有事務,runnable和callable也一樣,new出來的匿名內部類spring不管理他的事務,因為他沒有交給spring託管。想開啟事務要自己編碼。 另外執行緒之間也是沒有影響的,一個執行緒異常結束,不會影響其他執行緒,仔細想想道理都知道,還是做實驗區印證了下。再重複下,只有spring託管的bean才會去管理事務,自己new出來的不管理事務,相信自己學的,重要的說三遍,標紅吧。