1. 程式人生 > >java多執行緒批量讀取檔案(二)--讀寫分離

java多執行緒批量讀取檔案(二)--讀寫分離

package com.net.thread.future;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * @author 陳瑤
 * @Time:2017年8月16日 下午5:26:37
 * @version 1.0
 * @description
 */
public class CallableDemo3 {

	final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

	public static void main(String[] args)
	{
		File f = new File("C://Users//LENOVO//Desktop//file");
		// 檔案總數
		final List<File> filePathsList = new ArrayList<File>();
		File[] filePaths = f.listFiles();
		for (File s : filePaths) {
			filePathsList.add(s);
		}

		CountDownLatch latch = new CountDownLatch(filePathsList.size());
		ExecutorService pool = Executors.newFixedThreadPool(10);

		BlockingQueue<Future<Map<String, FileInputStream>>> queue = 
				new ArrayBlockingQueue<Future<Map<String, FileInputStream>>>(100);

		System.out.println("-------------檔案讀、寫任務開始時間:" + sdf.format(new Date()));
		for (int i = 0; i < filePathsList.size(); i++) {
			File temp = filePathsList.get(i);
			Future<Map<String, FileInputStream>> future = pool.submit(new MyCallableProducer(latch, temp));
			queue.add(future);

			pool.execute(new MyCallableConsumer(queue));
		}
		
		try {
			latch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("-------------檔案讀、寫任務結束時間:" + sdf.format(new Date()));
	 	pool.shutdownNow();
	}

	
	// 檔案讀執行緒
	static class MyCallableProducer implements Callable<Map<String, FileInputStream>>
	{
		private CountDownLatch latch;
		private File file;
		private FileInputStream fis = null;
		private Map<String, FileInputStream> fileMap = new HashMap<String, FileInputStream>();

		public MyCallableProducer(CountDownLatch latch, File file)
		{
			this.latch = latch;
			this.file = file;
		}

		@Override
		public Map<String, FileInputStream> call() throws Exception
		{
			System.out.println(Thread.currentThread().getName() + " 執行緒開始讀取檔案 :" + file.getName() + " ,時間為 "+ sdf.format(new Date()));
			fis = new FileInputStream(file);
			fileMap.put(file.getName(), fis);
		 	doWork();
			System.out.println(Thread.currentThread().getName() + " 執行緒讀取檔案 :" + file.getName() + " 完畢"  + " ,時間為 "+ sdf.format(new Date()));
			latch.countDown();
			return fileMap;
		}
		
		private void doWork() 
		{
			//此方法可以新增一些業務邏輯,如何包裝一些pojo等,返回值可以是任何型別
Random rand = new Random(); int time = rand.nextInt(10) * 1000; try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } } // 檔案寫執行緒 static class MyCallableConsumer implements Runnable { private String fileName = ""; private BlockingQueue<Future<Map<String, FileInputStream>>> queue; private FileInputStream fis = null; private File dirFile = null; private BufferedReader br = null; private InputStreamReader isr = null; private FileWriter fw = null; private BufferedWriter bw = null; public MyCallableConsumer(BlockingQueue<Future<Map<String, FileInputStream>>> queue2) { this.queue = queue2; } @Override public void run() { try { Future<Map<String, FileInputStream>> future = queue.take(); Map<String, FileInputStream> map = future.get(); Set<String> set = map.keySet(); for (Iterator<String> iter = set.iterator(); iter.hasNext();) { fileName = iter.next().toString(); fis = map.get(fileName); System.out.println(Thread.currentThread().getName() + " 執行緒開始寫檔案 :" + fileName + " ,時間為 "+ sdf.format(new Date())); try { isr = new InputStreamReader(fis, "utf-8"); br = new BufferedReader(isr); dirFile = new File("d:" + File.separator + "gc3" + File.separator + fileName); fw = new FileWriter(dirFile); bw = new BufferedWriter(fw); String data = ""; bw.write("+++++++++++++" + Thread.currentThread().getName() + " 執行緒開始寫檔案++++++++++++"); while ((data = br.readLine()) != null) { bw.write(data + "\r"); } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { try { bw.close(); br.close(); } catch (IOException e) { e.printStackTrace(); } } } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } }