Java 多執行緒寫同一個檔案實現
阿新 • • 發佈:2019-01-30
最近專案中需要從網站上抓取大量的資料,採用了多執行緒技術,每個執行緒抓取的資料都需要儲存到一個檔案中,避免消耗大量的記憶體。
思路:多個訪問執行緒將需要寫入到檔案中的資料先儲存到一個佇列裡面,然後由專門的 寫出執行緒負責從佇列中取出資料並寫入到檔案中。
WriterQueue.java 存放要輸出的資料佇列
package com.yulore.write; import java.util.LinkedList; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class WriterQueue { private static final int MAX_QUEUE_SIZE = 5000; private LinkedList<String> queue = new LinkedList<String>(); private Lock lock = new ReentrantLock(); private Condition notFull = lock.newCondition(); private Condition notEmpty = lock.newCondition(); private static WriterQueue manager = new WriterQueue(); private WriterQueue(){ } public static WriterQueue getQueue(){ return manager; } public void put(String phone){ lock.lock(); try { while (queue.size() == MAX_QUEUE_SIZE) { System.out.println("warning: data queue is full!"); notFull.await(); } queue.addFirst(phone); notEmpty.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally{ lock.unlock(); } } public LinkedList<String> takeAll(){ LinkedList<String> retVal = new LinkedList<String>(); lock.lock(); try { while (queue.size() == 0) { System.out.println("warning: data queue is empty!"); notEmpty.await(); } retVal.addAll(queue); // for(String str : queue){ // retVal.add(str); // } //清空佇列 queue.clear(); notFull.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally{ lock.unlock(); } return retVal; } }
WriteTask_New.java 模擬產生資料的執行緒類
package com.yulore.write; public class WriteTask_New implements Runnable { @Override public void run() { for(int i=0;i<20;i++){ // try { // sleep(100); // } catch (InterruptedException e) { // e.printStackTrace(); // } WriterQueue.getQueue().put("for:"+i+" thread:"+Thread.currentThread().getName()); } } private void sleep(int millis) throws InterruptedException { Thread.sleep(millis); } }
OutputTask.java 負責將資料寫入到檔案中
package com.yulore.write; import java.io.BufferedWriter; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; import java.util.LinkedList; public class OutputTask implements Runnable { private String fileName; public OutputTask(String fileName) { this.fileName = fileName; } @Override public void run() { while(true){ try { sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } LinkedList<String> list = WriterQueue.getQueue().takeAll(); write2Disk(list); list = null; } } private void write2Disk(LinkedList<String> list) { if(list==null ||list.size()==0){ System.out.println("no data..."); return; } System.out.println("開始序列化資料 "+fileName); String path = "D:/fbb/myWorkSpace_DW07/"; File outputFile = new File(path+fileName); if(outputFile==null ||!outputFile.exists()){ try { outputFile.createNewFile(); } catch (IOException e) { e.printStackTrace(); } } FileOutputStream out = null; OutputStreamWriter writer = null; BufferedWriter bw = null; try { out = new FileOutputStream(outputFile, true); writer = new OutputStreamWriter(out); bw = new BufferedWriter(writer); for(String content : list){ bw.write(content); bw.newLine(); bw.flush(); } } catch (IOException e) { e.printStackTrace(); }finally{ try { if(bw!=null) bw.close(); } catch (IOException e) { e.printStackTrace(); } } } private void sleep(int millis) throws InterruptedException { Thread.sleep(millis); } }
測試類
package com.yulore.write;
public class TestWrite {
/**
* @param args
*/
public static void main(String[] args) {
// test();
test02();
}
private static void test02() {
WriteTask_New write = new WriteTask_New();
for(int i=0;i<4;i++){
new Thread(write).start();
}
OutputTask output = new OutputTask("abc.txt");
new Thread(output).start();
}
private static void test() {
WriteTask write = new WriteTask("abc.txt");
for(int i=0;i<5;i++){
new Thread(write).start();
}
}
}