1. 程式人生 > >Java併發任務處理之Executor執行緒池

Java併發任務處理之Executor執行緒池

乾貨

import org.junit.After;
import org.junit.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolTest {

    private ExecutorService executorService;

    /**
     * 監聽至所有執行緒結束
     */
    @After
    public void terminated() {
        executorService.
shutdown(); while(true) { if(executorService.isTerminated()) { return; } else { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } }
} } /** * 快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。 */ @Test public void testCachedThreadPool() { executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 5; i++) { executorService.execute(new Runnable() { @Override
public void run() { System.out.println("Runing..."); } }); } } /** * 定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。 */ @Test public void testFixedThreadPool() { executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 50; i++) { executorService.execute(new Runnable() { @Override public void run() { System.out.println("Runing..."); } }); } } /** * 單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。 */ @Test public void testSingleThreadExecutor() { executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 50; i++) { executorService.execute(new Runnable() { @Override public void run() { System.out.println("Runing..."); } }); } } }

應用場景

現有一業務,對資料庫內使用者賬戶資訊更新,需要遍歷10k+Ethereum地址,並訪問geth客戶端,獲取地址內的餘額。專案內使用QuartzJob定時執行該這一業務,程式碼:

public class BalanceJob extends QuartzJobBean {
    private static final Logger logger = LoggerFactory.getLogger(BalanceJob.class);

    @Autowired
    private AccountMapper accountMapper;

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        logger.info("==>Update balance {}", new Date());
        long start = System.currentTimeMillis();
        AccountExample accountExample = new AccountExample();
        AccountExample.Criteria accountExampleCriteria = accountExample.createCriteria();
        accountExampleCriteria.andIsDeleteEqualTo(false);
        List<Account> accountList = accountMapper.selectByExample(accountExample);
        List<Account> recordList = new ArrayList<>(accountList.size());
        logger.info("==>record amount {}", accountList.size());
        WalletUtil.getWeb3j();
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        accountList.forEach(account -> {
            cachedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    Crypto crypto = account.getCrypto();
                    Coin coin = account.getCoin();
                    if (null == crypto || null == coin) {
                        return;
                    }
                    BigDecimal balanceOri = account.getBalance();
                    String address = crypto.getAddress();
                    String contract = coin.getContract();
                    String symbol = coin.getSymbol();
                    int decimals = coin.getDecimals();
                    BigDecimal balance = BigDecimal.ZERO;
                    if ("ETH".equalsIgnoreCase(symbol)) {
                        try {
                            balance = WalletUtil.getBalance(address);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    } else if (StringUtils.isNotBlank(contract)) {
                        try {
                            balance = WalletUtil.getBalance(address, contract, decimals);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    } else {
                        logger.error("==>Coin is not exist : ", symbol);
                        return;
                    }
                    if (BigDecimal.ZERO.compareTo(balance) != 0 && balance.compareTo(balanceOri) != 0) {
                        Account record = new Account.Builder().id(account.getId()).balance(balance).build();
                        recordList.add(record);
                    }
                }
            });
        });
        cachedThreadPool.shutdown();
        while (true) {
            if (cachedThreadPool.isTerminated()) {
                break;
            } else {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        if (recordList.size() > 0) {
            accountMapper.batchUpdateBalanceById(recordList);
        }
        long end = System.currentTimeMillis();
        logger.info("<==Update balance end. took {}ms", end - start);
    }
}

使用CachedThreadPool處理getBalance方法。優勢:16k條記錄可以在6000ms左右處理完畢;劣勢:與需求“與生俱來”,增加以太坊RPC客戶端負荷。