批量任務體現多執行緒的威力!
對於多執行緒的理解不是非常深刻,工作中用到多執行緒程式碼的機會也不多,前不久遇到了一個使用場景,通過編碼實現後對於多執行緒的理解和應用有了更加深刻的理解。場景如下:現有給使用者傳送產品調研的需求,運營的同事拿來了一個Excel檔案,要求給Excel裡面大約六萬個手機號傳送調研簡訊。
最簡單的方法就是一個迴圈然後單執行緒順序傳送,但是核心問題在於,給簡訊運營商發簡訊的介面響應時間較長,假設平均100ms的響應時間,那麼單執行緒傳送的話需要6萬*0.1秒=6000秒。顯然這個時間是不能接受的,運營商系統的傳送介面我們是不能優化的,只得增強自己的傳送和處理能力才能儘快的完成任務。
批量發簡訊
讀取Excel中的資訊
包依賴
工具類程式碼,Maven中引入如下兩個包
<dependency> <groupId>org.apache.poi</groupId> <artifactId>poi-ooxml</artifactId> <version>3.17</version> </dependency> <dependency> <groupId>org.apache.xmlbeans</groupId> <artifactId>xmlbeans</artifactId> <version>2.6.0</version> </dependency> 複製程式碼
讀取Excel的工具類程式碼
/** * 讀取Excel的檔案資訊 * * @param fileName */ public static void readFromExcel(String fileName) { InputStream is = null; try { is = new FileInputStream(fileName); XSSFWorkbook workbook = new XSSFWorkbook(is); XSSFSheet sheet = workbook.getSheetAt(0); int num = 0; // 迴圈行Row for (int rowNum = 0, lastNum = sheet.getLastRowNum(); rowNum <= lastNum; rowNum++) { XSSFRow row = sheet.getRow(rowNum); String phoneNumber = getStringValueFromCell(row.getCell(0)).trim(); phoneList.add(phoneNumber); } System.out.println(num); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } /** * 讀取Excel裡面Cell內容 * * @param cell * @return */ private static String getStringValueFromCell(XSSFCell cell) { // 單元格內的時間格式 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); // 單元格內的數字型別 DecimalFormat decimalFormat = new DecimalFormat("#.#####"); // 單元格預設為空 String cellValue = ""; if (cell == null) { return cellValue; } // 按型別讀取 if (cell.getCellType() == XSSFCell.CELL_TYPE_STRING) { cellValue = cell.getStringCellValue(); } else if (cell.getCellType() == XSSFCell.CELL_TYPE_NUMERIC) { // 日期轉為時間形式 if (DateUtil.isCellDateFormatted(cell)) { double d = cell.getNumericCellValue(); Date date = DateUtil.getJavaDate(d); cellValue = dateFormat.format(date); } else { // 其他轉為數字 cellValue = decimalFormat.format((cell.getNumericCellValue())); } } else if (cell.getCellType() == XSSFCell.CELL_TYPE_BLANK) { cellValue = ""; } else if (cell.getCellType() == XSSFCell.CELL_TYPE_BOOLEAN) { cellValue = String.valueOf(cell.getBooleanCellValue()); } else if (cell.getCellType() == XSSFCell.CELL_TYPE_ERROR) { cellValue = ""; } else if (cell.getCellType() == XSSFCell.CELL_TYPE_FORMULA) { cellValue = cell.getCellFormula().toString(); } return cellValue; } 複製程式碼
模擬運營商傳送簡訊的方法
/** * 外部介面耗時長,通過多執行緒增強 * * @param userPhone */ public void sendMsgToPhone(String userPhone) { try { Thread.sleep(SEND_COST_TIME); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("send message to : " + userPhone); } 複製程式碼
多執行緒發簡訊
簡單的單執行緒傳送
/** * 單執行緒傳送 * * @param phoneList * @return */ private long singleThread(List<String> phoneList) { long start = System.currentTimeMillis(); /*// 直接主執行緒執行 for (String phoneNumber : phoneList) { threadOperation.sendMsgToPhone(phoneNumber); }*/ SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(phoneList); smet.start(); long totalTime = System.currentTimeMillis() - start; System.out.println("單執行緒傳送總時間:" + totalTime); return totalTime; } 複製程式碼
對於大批量發簡訊的場景,如果使用單執行緒將全部一千個號碼傳送完畢的話,大約需要103132ms,可見效率低下,耗費時間較長。
多執行緒傳送簡訊中的一個核心要點是,將全部手機號碼拆分成多個組後,分配給每個執行緒進行執行。
兩個執行緒的示例
/** * 兩個執行緒傳送 * * @param phoneList * @return */ private long twoThreads(List<String> phoneList) { long start = System.currentTimeMillis(); List<String> list1 = phoneList.subList(0, phoneList.size() / 2); List<String> list2 = phoneList.subList(phoneList.size() / 2, phoneList.size()); SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(list1); smet.start(); SendMsgExtendThread smet1 = threadOperation.new SendMsgExtendThread(list2); smet1.start(); return 0; } 複製程式碼
另一種資料分組方式
/** * 另外一種分配方式 * * @param phoneList */ private void otherThread(List<String> phoneList) { for (int threadNo = 0; threadNo < 10; threadNo++) { int numbersPerThread = 10; List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 10); SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(list); smet.start(); if (list.size() < numbersPerThread) { break; } } } 複製程式碼
執行緒池傳送
/** * 執行緒池傳送 * * @param phoneList * @return */ private void threadPool(List<String> phoneList) { for (int threadNo = 0; threadNo < THREAD_POOL_SIZE; threadNo++) { int numbersPerThread = 10; List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 10); threadOperation.executorService.execute(threadOperation.new SendMsgExtendThread(list)); } threadOperation.executorService.shutdown(); } 複製程式碼
使用Callable傳送
/** * 多執行緒傳送 * * @param phoneList * @return */ private void multiThreadSend(List<String> phoneList) { List<Future<Long>> futures = new ArrayList<>(); for (int threadNo = 0; threadNo < THREAD_POOL_SIZE; threadNo++) { int numbersPerThread = 100; List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 100); Future<Long> future = threadOperation.executorService.submit(threadOperation.new SendMsgImplCallable(list, String.valueOf(threadNo))); futures.add(future); } for (Future<Long> future : futures) { try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } threadOperation.executorService.shutdown(); } 複製程式碼
使用多執行緒傳送,將傳送任務進行分割然後分配給每個執行緒執行,執行完畢需要10266ms,可見執行效率明顯提升,消耗時間明顯縮短。
完整程式碼
package com.lingyejun.tick.authenticator; import org.apache.poi.ss.usermodel.DateUtil; import org.apache.poi.xssf.usermodel.XSSFCell; import org.apache.poi.xssf.usermodel.XSSFRow; import org.apache.poi.xssf.usermodel.XSSFSheet; import org.apache.poi.xssf.usermodel.XSSFWorkbook; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.*; public class ThreadOperation { // 發簡訊的同步等待時間 private static final long SEND_COST_TIME = 100L; // 手機號檔案 private static final String FILE_NAME = "/Users/lingye/Downloads/phone_number.xlsx"; // 手機號列表 private static List<String> phoneList = new ArrayList<>(); // 單例物件 private static volatile ThreadOperation threadOperation; // 執行緒個數 private static final int THREAD_POOL_SIZE = 10; // 初始化執行緒池 private ExecutorService executorService = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); public ThreadOperation() { // 從本地檔案中讀取手機號碼 readFromExcel(FILE_NAME); } public static void main(String[] args) { ThreadOperation threadOperation = getInstance(); //threadOperation.singleThread(phoneList); threadOperation.multiThreadSend(phoneList); } /** * 單例獲取物件 * * @return */ public static ThreadOperation getInstance() { if (threadOperation == null) { synchronized (ThreadOperation.class) { if (threadOperation == null) { threadOperation = new ThreadOperation(); } } } return threadOperation; } /** * 讀取Excel的檔案資訊 * * @param fileName */ public static void readFromExcel(String fileName) { InputStream is = null; try { is = new FileInputStream(fileName); XSSFWorkbook workbook = new XSSFWorkbook(is); XSSFSheet sheet = workbook.getSheetAt(0); int num = 0; // 迴圈行Row for (int rowNum = 0, lastNum = sheet.getLastRowNum(); rowNum <= lastNum; rowNum++) { XSSFRow row = sheet.getRow(rowNum); String phoneNumber = getStringValueFromCell(row.getCell(0)).trim(); phoneList.add(phoneNumber); } System.out.println(num); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } /** * 讀取Excel裡面Cell內容 * * @param cell * @return */ private static String getStringValueFromCell(XSSFCell cell) { // 單元格內的時間格式 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); // 單元格內的數字型別 DecimalFormat decimalFormat = new DecimalFormat("#.#####"); // 單元格預設為空 String cellValue = ""; if (cell == null) { return cellValue; } // 按型別讀取 if (cell.getCellType() == XSSFCell.CELL_TYPE_STRING) { cellValue = cell.getStringCellValue(); } else if (cell.getCellType() == XSSFCell.CELL_TYPE_NUMERIC) { // 日期轉為時間形式 if (DateUtil.isCellDateFormatted(cell)) { double d = cell.getNumericCellValue(); Date date = DateUtil.getJavaDate(d); cellValue = dateFormat.format(date); } else { // 其他轉為數字 cellValue = decimalFormat.format((cell.getNumericCellValue())); } } else if (cell.getCellType() == XSSFCell.CELL_TYPE_BLANK) { cellValue = ""; } else if (cell.getCellType() == XSSFCell.CELL_TYPE_BOOLEAN) { cellValue = String.valueOf(cell.getBooleanCellValue()); } else if (cell.getCellType() == XSSFCell.CELL_TYPE_ERROR) { cellValue = ""; } else if (cell.getCellType() == XSSFCell.CELL_TYPE_FORMULA) { cellValue = cell.getCellFormula().toString(); } return cellValue; } /** * 外部介面耗時長,通過多執行緒增強 * * @param userPhone */ public void sendMsgToPhone(String userPhone) { try { Thread.sleep(SEND_COST_TIME); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("send message to : " + userPhone); } /** * 單執行緒傳送 * * @param phoneList * @return */ private long singleThread(List<String> phoneList) { long start = System.currentTimeMillis(); /*// 直接主執行緒執行 for (String phoneNumber : phoneList) { threadOperation.sendMsgToPhone(phoneNumber); }*/ SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(phoneList); smet.start(); long totalTime = System.currentTimeMillis() - start; System.out.println("單執行緒傳送總時間:" + totalTime); return totalTime; } /** * 另外一種分配方式 * * @param phoneList */ private void otherThread(List<String> phoneList) { for (int threadNo = 0; threadNo < 10; threadNo++) { int numbersPerThread = 10; List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 10); SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(list); smet.start(); if (list.size() < numbersPerThread) { break; } } } /** * 兩個執行緒傳送 * * @param phoneList * @return */ private long twoThreads(List<String> phoneList) { long start = System.currentTimeMillis(); List<String> list1 = phoneList.subList(0, phoneList.size() / 2); List<String> list2 = phoneList.subList(phoneList.size() / 2, phoneList.size()); SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(list1); smet.start(); SendMsgExtendThread smet1 = threadOperation.new SendMsgExtendThread(list2); smet1.start(); return 0; } /** * 執行緒池傳送 * * @param phoneList * @return */ private void threadPool(List<String> phoneList) { for (int threadNo = 0; threadNo < THREAD_POOL_SIZE; threadNo++) { int numbersPerThread = 10; List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 10); threadOperation.executorService.execute(threadOperation.new SendMsgExtendThread(list)); } threadOperation.executorService.shutdown(); } /** * 多執行緒傳送 * * @param phoneList * @return */ private void multiThreadSend(List<String> phoneList) { List<Future<Long>> futures = new ArrayList<>(); for (int threadNo = 0; threadNo < THREAD_POOL_SIZE; threadNo++) { int numbersPerThread = 100; List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 100); Future<Long> future = threadOperation.executorService.submit(threadOperation.new SendMsgImplCallable(list, String.valueOf(threadNo))); futures.add(future); } for (Future<Long> future : futures) { try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } threadOperation.executorService.shutdown(); } public class SendMsgExtendThread extends Thread { private List<String> numberListByThread; public SendMsgExtendThread(List<String> numberList) { numberListByThread = numberList; } @Override public void run() { long startTime = System.currentTimeMillis(); for (int i = 0; i < numberListByThread.size(); i++) { System.out.print("no." + (i + 1)); sendMsgToPhone(numberListByThread.get(i)); } System.out.println("== single thread send " + numberListByThread.size() + "execute time:" + (System.currentTimeMillis() - startTime) + " ms"); } } public class SendMsgImplCallable implements Callable<Long> { private List<String> numberListByThread; private String threadName; public SendMsgImplCallable(List<String> numberList, String threadName) { numberListByThread = numberList; this.threadName = threadName; } @Override public Long call() throws Exception { Long startMills = System.currentTimeMillis(); for (String number : numberListByThread) { sendMsgToPhone(number); } Long endMills = System.currentTimeMillis(); return endMills - startMills; } } } 複製程式碼