1. 程式人生 > >10億條long型資料外部排序的檔案分割實踐及優化過程(JAVA)

10億條long型資料外部排序的檔案分割實踐及優化過程(JAVA)

 一、題目

    生成10億個long隨機數正整數,把它寫入一個檔案裡。然後實現一個函式 fetch(int k,int n)。(fetch函式的輸出結果是這10億個正整數中從小到大中第k個開始(不包含第k個)往後取n個數。)     給定記憶體為2G(一開始為1G)。

二、題目分析

    (1)首先生成10億個long隨機正整數,可考慮使用ThreadLocalRandom和多執行緒生成隨機數。由於全部資料記憶體佔用10幾G,需要分批寫入檔案。(一個數據一行,行末為\n)     (2)fetch函式的實現:     1.先對隨機數進行外部排序。由於隨機數檔案較大,無法一次性讀取全部資料進行排序,所以必須對隨機數檔案進行分割成多個完成資料排序的小檔案,然後通過多路歸併實現外部排序。     2.然後實現fetch函式,輸出結果     因此本文主要針對外部排序的檔案分割部分進行說明,至於多路歸併和fetch函式的實現本文暫不開展。

 三、程式設計

    本文主要針對大檔案切割的程式設計進行分析。         設計思路:通過BufferedReader的readLine()方法讀取每一行資料為String(read),並將String轉換為long(parse),存放在一個long[]數組裡。當裝滿long[]時通過Araay.sort()排序(sort),將排序好的long[]按行輸出(write)。(long[]大小自行設定)通過多次反覆操作實現大檔案分割。(為了方便描述,後文將大檔案切割分為read、parse、sort、write四個部分來描述)為了加快效率,我用一個執行緒執行read、parse,另一個執行緒執行sort、write,兩執行緒間用BlockingQueue交流資料。     本方案程式碼實現簡單,但是效率不忍直視,而且由於過大的記憶體開銷,很容易就堆滿了,沒等它執行成功就對它進行優化了。。。     優化分為以下幾個部分     read部分:     10億個資料要讀10億次。。。因此read部分必須進行優化。因此採用RandomAccessFile和多執行緒結合進行讀取,根據偏移量進行分次讀取,每次讀取32M(這個量是比較快而且不容易出現堆滿的)。讀取的位元組資料存放在byte[]資料裡,這時會出現一個新問題:每次讀取的資料的末尾不一定是以“\n”結束,那麼必定有個隨機數被分割了!     由於採用多執行緒進行IO讀操作,因此為了解決隨機數被分割問題肥了點心思。     主要思路是每完成一次read(b, 0, length)之後,往後繼續read()一個byte,直到遇到第一個[10](即'\n');同時還要判斷每次read起始部分是否為完整的一個隨機數,從byte[0]開始判斷直到遇見第一個[10](即'\n')。這樣才能在parse的時候資料時完整的。由於程式碼不小心刪了,這裡就提供一個思路。     parse部分:     將byte[]轉為String,再用parseLong()轉為long,然後放入ArrayList。。。其實這很蠢。     sort部分:     直接呼叫sort函式。     write部分:     將long資料按行寫出為字元。後來考慮到分割的檔案是臨時檔案,用完就刪。所以用DataOutputStream包裝BufferOutputStream輸出為一個個8位元組的long,這樣減少了一半以上的檔案大小,且能提高輸出效率。(ps:這算是一個不錯的想法)     歷經千辛萬苦,跑了20幾分鐘才分割完資料,太慢了。     反思:方案一有許多不足之處。     採用多執行緒進行I/O操作並不一定會提高效率,有時反而會影響效率。因為一個磁碟一個時間段內只能進行一個I/O操作,如果通過多執行緒進行I/O操作,可能造成每次I/O是磁頭尋道的偏移量較大,也就是尋道時間長,反而增加了I/O時間。     其次parse部分。都說了好蠢。謹慎使用String,每次新的一個String都會佔用常量池。為了避免使用String,我尋思如何直接將byte[]資料轉換為long。於是乎想到了迭代計算,同時參考parseLong()的原始碼,進行優化。

優化分析:

    這個問題想要實現質的優化,必須合理使用多執行緒。後經高人點播:既然大檔案分割分為read、parse、sort、write四個部分,而且電腦是四核(二核四執行緒),那麼一個部分用一個執行緒進行操作,形成一條流水線,流水線上的資料通過BlockingQueue來傳遞。

 read部分:

//方案一    
class ReadData implements Runnable{
    private int spiltSize;//每次讀取檔案的大小
    private File file;//原始檔
    private int spiltNum;//read次數,spiltNum=file.length/spiltSize(有餘數就加1)
    private long startPosition ;//每次read的偏移量
    private BlockingQueue<byte[]> bq;
    ReadData(File file,int spiltSize,int spiltNum,BlockingQueue<byte[]> bq){
        this.file=file;    
        this.spiltSize=spiltSize;
        this.spiltNum=spiltNum;
        this.bq=bq;
    }
    public void run() {
        try(RandomAccessFile raf = new RandomAccessFile(file,"r");){            
            byte[] b=new byte[spiltSize];
            for(int k=0;k<spiltNum;k++) {
                startPosition =(long)k*(long)spiltSize;
                raf.seek(startPosition);
                int read = raf.read(b, 0, spiltSize);
                if(read==spiltSize) {                    
                    bq.put(b);
                }else {
                    byte[] temp = Arrays.copyOfRange(b, 0, read);
                    bq.put(temp);        
                }
            }
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

    這裡將每次讀取的資料直接傳遞給parse。為了減少寫出檔案的數量,我儘可能的將spiltSize設定大,這個方式並沒有充分利用多執行緒,read時間較長,後置的執行緒(parse、sort、write)處於阻塞狀態。由於缺乏對磁碟IO的理解,我侷限的認為一次性讀取的資料越大,減少I/O次數而提高效率,同時我又想保證每次寫出的資料也越多越好,這樣也可以減少後續歸併的路數。因此將每次讀取檔案大小盡可能調大。     通過對磁碟IO的瞭解,影響磁碟的關鍵因數是磁碟服務時間,即磁碟完成一個I/O請求所花費的時間,它由尋道時間、旋轉延遲和資料傳輸時間三部分構成。其中尋道時間、旋轉延遲是佔主要的,資料傳輸時間可以忽略。由於磁碟上每個扇區512byte,而作業系統的檔案系統不是一個扇區一個扇區的來讀資料,所以有了block(塊)的概念,它是一個塊一個塊的讀取的,塊(block)是基本的資料傳輸單元(一般的作業系統block size為4k)。那麼在磁碟上的同樣儲存位置,JAVA進行1024次4k的IO請求和進行1次4M的IO請求,在磁碟服務時間應該是差不多的(不知道這麼理解對不對,如有不對之處請指出)。那麼減少每次read的大小(保證為block size 的整數倍),保證流水線一直處於執行狀態,提高CPU的利用率。     ps:作業系統層對於IO的影響這邊就不考慮了。     針對這個問題做了優化。     (1)使用FileChannel包裝(FileInputStream)來進行讀檔案,每次讀取檔案大小spiltSize為8k。(由於電腦較差,每次執行結果波動很大,不能判斷哪種spiltSize取多少最好,經測試8k比4k好,16k比8k略差。這個地方有待繼續論證,但不影響整體的設計思路。FileChannel包裝也可以包裝RandomAccessFile,還沒測試出FileInputStream和RandomAccessFile哪個好,有高手可以指出)     (2)考慮的原始檔來源可能是網路傳輸,並不能識別原始檔實際大小,無法判斷實際分割的小檔案的個數,因此取消了`for(int i=0;i<spiltNum;i++)`來判定迴圈,而改用`while(true) `的死迴圈,直到原始檔讀取完畢退出,同時向下遊傳送一個size=0的陣列,作為結束的標誌,這種處理方式提高了程式的健壯性。(值得推廣)     程式碼優化如下:

//方案二
public void run() {
        try(
            FileInputStream fis = new FileInputStream(file);    //z這裡也可以包裝RandomAccessFile
            FileChannel fc = fis.getChannel();
        ){    
            ByteBuffer bb= ByteBuffer.allocate(spiltSize);//spiltSize
            long startPosition ;
            int read;
            int k=0;
            byte[] temp;
            while(true) {
                startPosition =(long)k*(long)spiltSize;    
                read=fc.read(bb, startPosition);            
                if(read!=-1) {
                        temp =new byte[read];
                        bb.flip();
                        bb.get(temp);
                        bb.clear();
                        bq.put(temp);
                        k++;
                }else {
                        bq.put(new byte[0]);
                        break;                            
                }        
            }
        }catch(IOException e) {
            e.printStackTrace();
        }catch(InterruptedException e1) {
            e1.printStackTrace();
        }
    }    

parse部分:

    parseLong()方法也是直接將byte[]轉化為long。那麼鑑於此,我通過迭代計算將byte[]陣列轉換為long,同時用位運算計算乘法更快。     一開始由於不知道byte[]中可以parse出多少long資料,無法宣告一個固定大小的陣列,所以我這裡考慮用List,這樣增加的拆裝與包裝的過程。在read部分優化之後,考慮到每次讀取資料為8k,需要積累一定數量的long資料再輸出比較好好,所以通過自定義一個固定大小的long[]陣列來儲存parse後的long型資料。這樣可以避免使用LIst<Long>,一定程度減小了記憶體開銷和提高了效能。   

 //方案二,方案一使用List,這裡不給出程式碼
    class Parse implements Runnable{
    private BlockingQueue<byte[]> bq;
    private BlockingQueue<long[]> bq1;
    private int num ;//自定義儲存long資料的個數
    Parse(BlockingQueue<byte[]> bq,BlockingQueue<long[]> bq1,int num){
        this.bq=bq;
        this.bq1=bq1;
        this.num=num;
    }
    public void run() {
        byte[] b;
        int count=0;
        long l=0L;//byte[]轉為long型資料,臨時
        long[] temp =new long[num];
        try {
            while(true) {
                b=bq.take();                
                if(b.length==0) {
                    long[] temp2= new long[count];                    
                    System.arraycopy(temp, 0,temp2,0,count);
                    bq1.put(temp2);
                    bq1.put(new long[0]);
                    break;
                }else {                        
                    for(int j=0,len=b.length;j<len;j++) {
                        if(b[j]!='\n') {
                            //l=l*10+(long)(b[j]-48);優化前
                            l=(l<<3)+(l<<1)+(b[j]-'0');//優化後
                        }else{
                            temp[count]=l;
                            count++;
                            l=0L;
                            if(count==num) {
                                bq1.put(temp);    
                                count=0;
                                temp =new long[num];
                            }
                        }
                    }
                }
            }            
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

sort部分:

    這個部分沒啥問題,sort部分是4個部分中執行速度最快的。     

//方案二
    class Sort implements Runnable{
    private BlockingQueue<long[]> bq1;
    private BlockingQueue<long[]> bq2;
    Sort(BlockingQueue<long[]> bq1,BlockingQueue<long[]> bq2){
        this.bq1=bq1;
        this.bq2=bq2;        
    }
    public void run() {
        long[] longs;
        try {
            while(true) {
                longs=bq1.take();
                if(longs.length==0) {
                    bq2.put(new long[0]);
                    break;
                }else {
                    Arrays.sort(longs);
                    bq2.put(longs);
                }
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

 write部分:

    方案一write部分的寫操作是通過遍歷一個個寫出long,效率比較低,造成一定的堵塞狀態,需考慮其他IO方式,一次性寫出較多的資料。後通過FileChannel  和 ByteBuffer寫資料,具體使用方法可以上網查閱NIO的API。以緩衝塊的方式寫出資料比一個一個寫明顯快多了。

  //方案一
        /*
        long[] l=bq2.take();
        DataOutputStream dos = new DataOutputStream(new BufferedOutputStream (new FileOutputStream(new File(tempFolder.getAbsolutePath()+"\\檔案"+i+".txt"))));
        for(long a:l) {
            dos.writeLong(a);
        }
        dos.close();
        */
    //方案二    
    class WriteData implements Runnable{
    private BlockingQueue<long[]> bq2;
    private File tempFolder;//臨時資料夾
    private CountDownLatch end;//告訴主執行緒可以繼續執行,主要用於計時
    private int num ;//自定義儲存long資料的個數
    private long[] longs;
    private int count =0;
    WriteData(BlockingQueue<long[]> bq2,File tempFolder,CountDownLatch end,int num){
        this.bq2=bq2;
        this.end=end;
        this.tempFolder=tempFolder;
        this.num=num;        
    }
    public void run() {
        try {
            ByteBuffer bb= ByteBuffer.allocate(num*8);
            while(true) {
                longs=bq2.take();
                bb.asLongBuffer().put(longs);
                if(longs.length==num) {
                    try(
                        FileOutputStream fos = new FileOutputStream(new File(tempFolder.getAbsolutePath()+"\\檔案"+count+".txt"));
                        FileChannel fc = fos.getChannel();
                    ){
                        fc.write(bb);
                        bb.clear();
                        count++;
                    }catch (IOException e) {
                        e.printStackTrace();
                    }
                }else if(longs.length!=0) {
                    bb.limit(longs.length*8);
                    try(
                        FileOutputStream fos = new FileOutputStream(new File(tempFolder.getAbsolutePath()+"\\檔案"+count+".txt"));
                        FileChannel fc = fos.getChannel();
                    ){
                        fc.write(bb);
                        bb.clear();
                    }catch (IOException e) {
                        e.printStackTrace();
                    }                
                }else {
                    end.countDown();
                    break;
                }
            }
        }catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

    總結:     通過分析,保證read和write能夠不間斷的佔用磁碟活動,且提高IO速度才是提高程式效能的關鍵。通過監控各個階段的BlockingQueue阻塞情況得出read<parse>sort<write(>表示前者單次迴圈用時大於後者,<反之)。由於parse部分消耗時間比較多,read的部分可能存在堵塞,需要合理設定BlockingQueue的大小。由於read操作比write操作頻繁,兩執行緒會同時爭搶磁碟IO操作,在系統層面排程可能造成不能及時write導致輕微堵塞情況。我考慮是不是利用鎖或者同一個執行緒來控制IO操作,保證write能及時將資料寫出。     同時方案二中還存在記憶體開銷較大的情況,GC次數較多,影響IO效率。如:迴圈中不斷new byte[]和new long[]。這裡需要考慮記憶體複用,減少記憶體開銷。這對整體的效能有很大的影響。除了優化GC次數外,過大的記憶體開銷也會降低磁碟IO的速率(原因本人暫不瞭解,有高人可以指點一下),因此記憶體複用,降低記憶體開銷是必須的。

 記憶體複用以及在應用層控制IO:

    首先是記憶體複用問題。     在原來的方案中,在迴圈內不停new新的物件,很容易就出現的堆滿異常。因此記憶體複用對效能的優化很重要。     將陣列比作盤子,資料比作食物。如果使用new的方式建立盤子(陣列),食物被吃完了盤子就等GC把它當垃圾回收了。不斷建立和銷燬盤子是消耗效能的。因此將被清洗乾淨的盤子重新利用起來保證記憶體複用是很重要的。具體怎麼實現呢?我的思路是先分析某一種盤子(如byte[]陣列)在程式執行中同一時刻最大的使用量,盤子(如byte[]陣列)的最大使用量為BlockingQueue中的數量加上執行緒中處理的數量。(這樣的話需要對BlockingQueue的大小進行限制,這需要根據程式實際執行情況來設定大小。)因此一開始建立最大需求量的盤子,通過迴圈使用來減少記憶體開銷。     具體實現如下,以read部分程式碼為例:

class ReadData implements Runnable{
    private File file;
    private BlockingQueue<byte[]> bq;
    private int spiltSize;
    private LinkedList<byte[]> list;//裝盤子的容器
    ReadData(File file,BlockingQueue<byte[]> bq,int spiltSize){
        this.spiltSize=spiltSize;
        this.file=file;
        this.bq=bq;
    }
    /*
    *init()初始化byte[]陣列並存在LinkedList裡面
    *而後通過temp =list.remove()從頭取陣列和list.add(temp)從尾加陣列
    *這種的方式保證List裡的盤子總數量不變,且第一個被使用的陣列內的資料處理完之前不會複用這個陣列
    *其實也可以通過get(index)的方式迴圈取陣列,但是這種方式計算開銷會略大一點
    */
    public void init() {//初始化盤子
        list = new LinkedList<byte[]>();
        for(int i=0;i<30;i++) {
            byte[] b = new byte[spiltSize];
            list.add(b);
        }
    }
    public void run() {
        try(
            RandomAccessFile raf = new RandomAccessFile(file,"r");    
            FileChannel fc = raf.getChannel();
        ){    
            ByteBuffer bb= ByteBuffer.allocate(spiltSize);
            long startPosition ;
            int read;
            int k=0;
            byte[] temp;
            while(true) {
                startPosition =(long)k*(long)spiltSize;    
                read=fc.read(bb, startPosition);            
                if(read!=spiltSize) {//最後的臨界情況單獨考慮
                        temp =new byte[read];
                        bb.flip();
                        bb.get(temp);
                        bb.clear();
                        bq.put(temp);
                        bq.put(new byte[0]);
                        break;
                }else {
                        temp =list.remove();//取盤子
                        bb.flip();
                        bb.get(temp);
                        bb.clear();
                        bq.put(temp);
                        k++;
                        list.add(temp);    //將盤子放在隊尾,等待複用                
                }        
            }
        }catch(IOException e) {
            e.printStackTrace();
        }catch(InterruptedException e1) {
            e1.printStackTrace();
        }
    }    
}

    這裡有個不足之處,需要根據程式的執行情況,手動設定一個合理的總盤子數。可能需要一個更靈活的方法。          應用層控制IO     由於windows系統層對IO的排程存在不確定性,write程序的IO請求有時候可能會滯後,為保證write程序的資料能及時寫出,考慮採用synchronized方式對read()和write()進行上鎖(切記保證正確性的情況下,synchronized程式碼塊越小越好)。`synchronized(lock){read=fc.read(bb, startPosition);}`和`synchronized(lock){fc.write(bb);}`。執行的結果與不用鎖的情況(優化記憶體開銷後的方案二)比起來略好一點或者說更穩定一些,這可能是硬體和作業系統的問題。按理說增加獲取鎖的開銷,效能應該差一些。(這裡稱之為**雙執行緒有鎖控制IO**和**雙執行緒無鎖控制IO**,這裡是讀和寫執行緒分開)     另一種方式只用一個執行緒來控制所有IO操作。     一種想法是通過**單執行緒無腦執行IO操作**。每一次的read和write作為任務投遞給一個執行緒執行,用一個BlockingQueue裝著。這樣執行緒拿到任務就無腦執行了。當然這裡還有一個問題:因為投遞任務的速度很快,而且read任務和write任務不是一個數量級(20000:1),會造成write任務被read任務阻塞情況出現。由於BlockingQueue一致處於阻塞狀態,執行一個任務加一個任務,write任務一定正常執行,所以效能上並沒有什麼大的影響(總不至於20000次put的都是read任務吧,這概率得多小)。還有一個問題就是結束問題!因為新增read任務的人(主執行緒)不知道read任務的總數是多少,這時候會出現無效的read任務,這裡只能通過read任務的執行情況的反饋來告知主執行緒什麼時候停止任務。

BlockingQueue<Runnable> bq3 = new LinkedBlockingQueue<Runnable>(200);//這queue的大小可以調整
while(true) {
    if(rd.read!=-1) {//原始檔讀完後表示讀任務結束
        bq3.put(rd);
    else {
        bq3.remove(rd);//將多餘的無效任務刪除
        break;
    }
}

    但sort完成後投遞write任務,由最後一次write任務讓程式結束執行

    while(true) {
            longs=bq1.take();
            if(longs.length==0) {
                bq2.put(new long[0]);
                bq3.put(wd);
                break;
            }else {
                Arrays.sort(longs);
                bq2.put(longs);
                bq3.put(wd);
            }
        }
    }

    還有一種**單執行緒有序執行IO任務**,就是執行任務的執行緒一邊執行一邊一天新增任務。因為只有執行任務的執行緒才能第一時間直到任務啥時候結束。當然效率上肯定比無腦執行任務那種方式差一點。

public void run() {
        try (
                RandomAccessFile raf = new RandomAccessFile(file,"r");    
                FileChannel fc = raf.getChannel();
            ){    
            while(flag) {    //flag初始化為true,最後一次寫結束後flag=false                
                if(read!=-1) {
                    readData(fc);
                    if(Rcount%1000==0&&bq2.size()>0) {//迴圈一定次數後再判斷bq2中是否有資料需要寫出
                        writeData();
                    }
                }else {
                    writeData();
                }                
            }
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }    
    }

    通過對以上幾種IO控制的方案進行對比(**雙執行緒有鎖控制IO**、**雙執行緒無鎖控制IO**、**單執行緒無腦執行IO操作**、**單執行緒有序執行IO任務**),其中**單執行緒無腦執行IO操作**執行出最快333s,平均IO速率將近80M/每秒。(讀檔案大小18.5G,寫檔案總大小7.45G)。由於電腦硬體老化的緣故,每次執行的結果相差較大,出現幾十秒的差距。因此從程式碼層面來講,**雙執行緒無鎖控制IO**效能也是可以的,只是將IO排程的控制交由作業系統處理了。不過由於執行IO的排程方式不同,導致他們的執行時記憶體佔用也不同。(後續進行更充分的測試補充結論)

總結:  

              JAVA不同的IO類有不同的特點,根據不同情況進行合理選擇,提高IO速度。在對IO進行優化時,需理解磁碟的物理結構和工作原理,避免走入誤區。(一開始沒找到問題根源所在,亂用多執行緒優化,浪費了大量的時間)每次讀取資料大塊檔案時,以4k的整數倍比較好。         充分利用多執行緒,提高CPU利用率,通過多執行緒進行非同步操作,就像“燒開水泡茶”一樣。         在對資料的處理時,應該儘量減少記憶體的開支。new一個物件是比較消耗效能的,應該儘量複用記憶體,減少GC次數。務必避免大量使用String,這個很佔記憶體。有些計算使用位運算更快。總之就是減少計算,減少記憶體開支,當然正確性是前提。         在保證正確性的前提下,提高程式的健壯性和可讀性也很重要。同時,設計程式首先想著不是如何修改功能,而是如何擴充套件功能。         不足之處:         parse的耗時較長,sort的耗時較小,如果將parse部分的計算量勻一部分到sort這樣可以提高CPU的利用率,減少堵塞情況。         記憶體複用的方式不夠好,應該設計一個更靈活的方法。         IO是否還有提升空間?嘗試使用記憶體對映的方式進行寫操作,但是由於寫檔案總大小較大,記憶體佔用(堆外記憶體)上升,IO效能下降。         程式中BlockingQueue的大小設定、每次讀寫檔案的大小對程式效能的影響還沒有明確的一個結論。本文中給出的引數,主要通過本人破電腦測試,選取結果較好的,但不一定就是好的。

        本人作為剛入門的新人,第一次寫部落格,不足之處還望各位指正。希望各位大佬指點指點