java多執行緒批量讀取檔案(二)--讀寫分離
阿新 • • 發佈:2019-01-07
package com.net.thread.future;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* @author 陳瑤
* @Time:2017年8月16日 下午5:26:37
* @version 1.0
* @description
*/
public class CallableDemo3 {
final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args)
{
File f = new File("C://Users//LENOVO//Desktop//file");
// 檔案總數
final List<File> filePathsList = new ArrayList<File>();
File[] filePaths = f.listFiles();
for (File s : filePaths) {
filePathsList.add(s);
}
CountDownLatch latch = new CountDownLatch(filePathsList.size());
ExecutorService pool = Executors.newFixedThreadPool(10);
BlockingQueue<Future<Map<String, FileInputStream>>> queue =
new ArrayBlockingQueue<Future<Map<String, FileInputStream>>>(100);
System.out.println("-------------檔案讀、寫任務開始時間:" + sdf.format(new Date()));
for (int i = 0; i < filePathsList.size(); i++) {
File temp = filePathsList.get(i);
Future<Map<String, FileInputStream>> future = pool.submit(new MyCallableProducer(latch, temp));
queue.add(future);
pool.execute(new MyCallableConsumer(queue));
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("-------------檔案讀、寫任務結束時間:" + sdf.format(new Date()));
pool.shutdownNow();
}
// 檔案讀執行緒
static class MyCallableProducer implements Callable<Map<String, FileInputStream>>
{
private CountDownLatch latch;
private File file;
private FileInputStream fis = null;
private Map<String, FileInputStream> fileMap = new HashMap<String, FileInputStream>();
public MyCallableProducer(CountDownLatch latch, File file)
{
this.latch = latch;
this.file = file;
}
@Override
public Map<String, FileInputStream> call() throws Exception
{
System.out.println(Thread.currentThread().getName() + " 執行緒開始讀取檔案 :" + file.getName() + " ,時間為 "+ sdf.format(new Date()));
fis = new FileInputStream(file);
fileMap.put(file.getName(), fis);
doWork();
System.out.println(Thread.currentThread().getName() + " 執行緒讀取檔案 :" + file.getName() + " 完畢" + " ,時間為 "+ sdf.format(new Date()));
latch.countDown();
return fileMap;
}
private void doWork()
{
//此方法可以新增一些業務邏輯,如何包裝一些pojo等,返回值可以是任何型別
Random rand = new Random();
int time = rand.nextInt(10) * 1000;
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 檔案寫執行緒
static class MyCallableConsumer implements Runnable
{
private String fileName = "";
private BlockingQueue<Future<Map<String, FileInputStream>>> queue;
private FileInputStream fis = null;
private File dirFile = null;
private BufferedReader br = null;
private InputStreamReader isr = null;
private FileWriter fw = null;
private BufferedWriter bw = null;
public MyCallableConsumer(BlockingQueue<Future<Map<String, FileInputStream>>> queue2)
{
this.queue = queue2;
}
@Override
public void run()
{
try {
Future<Map<String, FileInputStream>> future = queue.take();
Map<String, FileInputStream> map = future.get();
Set<String> set = map.keySet();
for (Iterator<String> iter = set.iterator(); iter.hasNext();)
{
fileName = iter.next().toString();
fis = map.get(fileName);
System.out.println(Thread.currentThread().getName() + " 執行緒開始寫檔案 :" + fileName + " ,時間為 "+ sdf.format(new Date()));
try {
isr = new InputStreamReader(fis, "utf-8");
br = new BufferedReader(isr);
dirFile = new File("d:" + File.separator + "gc3" + File.separator + fileName);
fw = new FileWriter(dirFile);
bw = new BufferedWriter(fw);
String data = "";
bw.write("+++++++++++++" + Thread.currentThread().getName() + " 執行緒開始寫檔案++++++++++++");
while ((data = br.readLine()) != null) {
bw.write(data + "\r");
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
bw.close();
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}