1. 程式人生 > >【排序演算法】外部排序二 —— 外部排序技術之多路歸併

【排序演算法】外部排序二 —— 外部排序技術之多路歸併

外部排序技術之多路歸併

重點:敗者樹的建立調整函式

1.外部排序概述

外部排序指的是大檔案的排序,即待排序的記錄儲存在外儲存器上,待排序的檔案無法一次裝入記憶體,需要在記憶體和外部儲存器之間進行多次資料交換,以達到排序整個檔案的目的。外部排序最常用的演算法是多路歸併排序,即將原檔案分解成多個能夠一次性裝人記憶體的部分,分別把每一部分調入記憶體完成排序。然後,對已經排序的子檔案進行歸併排序。

2. 多路歸併的實現

2.1 勝者樹

勝者進入下一輪,直至決出本次比賽的冠軍。決出冠軍之後,充分利用上一次比賽的結果,使得更快地挑出亞軍、第三名  ……  。

示例:我們這裡以四路歸併為例,假設每個歸併段已經在輸入緩衝區如下圖。

每路的第一個元素為勝利樹的葉子節點,(5,7)比較出5勝出成為其根節點,(29,9)比較9勝出成為其根節點,一次向上生成一棵勝利樹,然後我們可以得出5為冠軍,將第一路歸併段的元素5放入輸出緩衝區,然後將第一路第二個元素放到勝利樹中如下:

由第一次得到的勝利樹知,我們這裡只改變了第1路的葉子節點,所有根節點7的右子樹不用再比較,(16,7)比較7勝出,然後7和右子樹的勝利者比較7勝出得到亞軍,只進行了2次比較。

所以我們知道:

 決出第一名需比較:   k - 1     次

 決出第二名需比較:       次

 決出第三名需比較:       次 .............

2.2 敗者樹

與勝利樹相類似,敗者樹是在雙親節點中記錄下剛剛進行完的這場比賽的敗者,讓勝者去參加更高一層的比賽。

示例:我們這裡以四路歸併為例,假設每個歸併段已經在輸入緩衝區如下圖。

每路的第一個元素為勝利樹的葉子節點,(5,7)比較出5勝出7失敗成為其根節點,(29,9)比較9勝出29失敗成為其根節點,勝者(5,9)進行下次的比賽7失敗成為其根節點5勝出輸出到輸出緩衝區。由第一路歸併段輸出,所有將第一路歸併段的第二個元素加到葉子節點如下圖:

加入葉子節點16進行第二次的比較,跟勝利樹一樣,由於右子樹葉子節點沒有發生變化其右子樹不用再繼續比較。

2.3 敗者樹程式實現

在建立敗者樹的時候初始化b[...]和ls[...],

b[0]~b[k-1]為k路的第一個元素,即為敗者樹的葉子節點,ls[0]~ls[k-1]儲存的為每次比賽的失敗者。

/**  

* 已知b[0]到b[k-1]為完全二叉樹ls的葉子結點,存有k個關鍵字,沿從葉子  

* 到根的k條路徑將ls調整成為敗者樹。 

*/ 

void CreateLoserTree(LoserTree ls){  

    int i; 

    b[k] = MINKEY; 

    /* 設定ls中“敗者”的初值 */ 

    for(i = 0; i < k; ++i){ 

        ls[i] = k;  

    } 

    /* 依次從b[k-1],b[k-2],…,b[0]出發調整敗者 */ 

    for(i = k - 1; i >= 0; --i){ 

        Adjust(ls, i); 

    } 

}

/* 沿從葉子結點b[s]到根結點ls[0]的路徑調整敗者樹。*/ 

void Adjust(LoserTree ls, int s){  

    int i, t;   

    /* ls[t]是b[s]的雙親結點 */ 

    t = (s + k) / 2;  

    while(t > 0){ 

        /* s指示新的勝者 */ 

        if(b[s] > b[ls[t]]){ 

            i = s; 

            s = ls[t];  

            ls[t] = i; 

        } 

        t = t / 2; 

    } 

    ls[0] = s; 

}

第一次調整:

由程式可以,先找到葉子節點的父節點,t = (s + k) / 2 = 3 ;  s為3),

 while(t > 0){ 

        /* s指示新的勝者 */ 

        if(b[s] > b[ls[t]]){ 

            i = s; 

            s = ls[t];  

            ls[t] = i; 

        } 

        t = t / 2; 

    } 

b[ls[t=3]] = b[k] = MINKEY < b[s] = b[3] 則交換ls[t]=k和s=3,然後t除以2,t/2 = 1, b[ls[1]] = b[k] = MINKEY ,b[s=k]=MINKEY,直到跳出迴圈,然後 ls[0] = s; 由於ls[0] = s = k,所有不變;

由第二路歸併樹程式進入調整函式,找到父節點為3,然後就是b[2]和b[3]比較,b[3] = 9勝出,則留在ls[3] = 2,進入下一層的為ls[1] = 3;

由第一路歸併樹進入調整函式,找到父節點為2,然後是b[1]和b[k=4]比較由於b[4]為最小值,所有b[4]勝出,b[1]失敗留在父節點ls[2] = 1,勝者進入上一層與ls[1]比較,很明顯b[4]為最小值勝出到達ls[0],留在ls[1] = 3;

由第一路歸併樹進入調整樹,先找到父節點2,然後與父節點比較b[0]勝出,b[1]依舊留在ls[2],繼續上一層的比較直到為上圖為止。

我們通過對建立敗者樹的分析可以知道,程式利用初始化敗者樹全為第k路,一個不存在的一路歸併樹,並且置第k路的值b[k]為最小值,這是為了讓它在每次比較中都能勝出,讓第一次比較的值留在失敗者的位置,第二次比較的時候自然就跟下一路比較了,這樣設計可以減少程式設計的特殊性,避免了特殊情況的出現。建立好敗者樹後,就可以利用敗者樹的性質來進行判斷了。

實現程式碼:(為了防止歸併段變為空的情況,我們將每路歸併段最後都加入了一個最大元素)

#include <stdio.h> 

#include <stdlib.h> 

#include <string.h> 

 

#define TRUE 1 

#define FALSE 0 

#define OK 1 

#define ERROR 0 

#define INFEASIBLE -1 

#define MINKEY -1 

#define MAXKEY 100 

 

/* Status是函式的型別,其值是函式結果狀態程式碼,如OK等 */ 

typedef int Status;  

 

/* Boolean是布林型別,其值是TRUE或FALSE */ 

typedef int Boolean; 

/* 一個用作示例的小順序表的最大長度 */ 

#define MAXSIZE 20  

 

typedef int KeyType;

/* k路歸併 */ 

#define k 3  

 

/* 設輸出M個數據換行 */ 

#define M 10  

 

/* k+1個檔案指標(fp[k]為大檔案指標),全域性變數 */ 

FILE *fp[k + 1];  

 

/* 敗者樹是完全二叉樹且不含葉子,可採用順序儲存結構 */ 

typedef int LoserTree[k];  

 

typedef KeyType ExNode, External[k+1];  

 

/* 全域性變數 */ 

External b;  

 

/* 從第i個檔案(第i個歸併段)讀入該段當前第1個記錄的關鍵字到外結點 */ 

int input(int i, KeyType *a){ 

    int j = fscanf(fp[i], "%d ", a); 

    if(j > 0){ 

        printf("%d\n", *a); 

        return 1; 

    }else{ 

        return 0; 

    } 

} 

 

/* 將第i個檔案(第i個歸併段)中當前的記錄寫至輸出歸併段 */ 

void output(int i){ 

    fprintf(fp[k], "%d ", b[i]); 

} 

 

/* 沿從葉子結點b[s]到根結點ls[0]的路徑調整敗者樹。*/ 

void Adjust(LoserTree ls, int s){  

    int i, t; 

     

    /* ls[t]是b[s]的雙親結點 */ 

    t = (s + k) / 2;  

    while(t > 0){ 

        /* s指示新的勝者 */ 

        if(b[s] > b[ls[t]]){ 

            i = s; 

            s = ls[t];  

            ls[t] = i; 

        } 

        t = t / 2; 

    } 

    ls[0] = s; 

} 

 

/**  

* 已知b[0]到b[k-1]為完全二叉樹ls的葉子結點,存有k個關鍵字,沿從葉子  

* 到根的k條路徑將ls調整成為敗者樹。 

*/ 

void CreateLoserTree(LoserTree ls){  

    int i; 

    b[k] = MINKEY; 

     

    /* 設定ls中“敗者”的初值 */ 

    for(i = 0; i < k; ++i){ 

        ls[i] = k;  

    } 

     

    /* 依次從b[k-1],b[k-2],…,b[0]出發調整敗者 */ 

    for(i = k - 1; i >= 0; --i){ 

        Adjust(ls, i); 

    } 

} 

 

/**  

* 利用敗者樹ls將編號從0到k-1的k個輸入歸併段中的記錄歸併到輸出歸併段。  

* b[0]至b[k-1]為敗者樹上的k個葉子結點,分別存放k個輸入歸併段中當前記錄的關鍵字。  

*/ 

void K_Merge(LoserTree ls, External b){  

    int i, q; 

     

    /* 分別從k個輸入歸併段讀人該段當前第一個記錄的關鍵字到外結點 */ 

    for(i = 0; i < k; ++i) { 

        input(i, &b[i]); 

    } 

     

    /* 建敗者樹ls,選得最小關鍵字為b[ls[0]].key */ 

    CreateLoserTree(ls);  

     

    while(b[ls[0]] != MAXKEY){ 

        /* q指示當前最小關鍵字所在歸併段 */ 

        q = ls[0];  

         

        /* 將編號為q的歸併段中當前(關鍵字為b[q].key)的記錄寫至輸出歸併段 */ 

        output(q);  

         

        /* 從編號為q的輸入歸併段中讀人下一個記錄的關鍵字 */ 

        if(input(q, &b[q]) > 0){ 

            /* 調整敗者樹,選擇新的最小關鍵字 */ 

            Adjust(ls,q);  

        }  

    } 

     

    /* 將含最大關鍵字MAXKEY的記錄寫至輸出歸併段 */ 

    output(ls[0]);  

} 

 

void show(KeyType t) { 

    printf("(%d)", t); 

} 

 

int main(){ 

    KeyType r; 

    int i, j; 

    char fname[k][4], fout[5] = "out", s[3]; 

    LoserTree ls; 

     

    /* 依次開啟f0,f1,f2,…,k個檔案 */ 

    for(i = 0; i < k; i++){  

        /* 生成k個檔名f0,f1,f2,… */ 

        itoa(i, s, 10);  

        strcpy(fname[i], "f"); 

        strcat(fname[i], s); 

         

        /* 以讀的方式開啟檔案f0,f1,… */ 

        fp[i] = fopen(fname[i], "r");  

        printf("有序子檔案f%d的記錄為:\n",i); 

         

        /* 依次將f0,f1,…的資料讀入r */ 

        do{ 

            j = fscanf(fp[i], "%d ", &r); 

            /* 輸出r的內容 */ 

            if(j == 1){ 

                show(r);  

            } 

        }while(j == 1); 

        printf("\n"); 

         

        /* 使fp[i]的指標重新返回f0,f1,…的起始位置,以便重新讀入記憶體 */ 

        rewind(fp[i]);  

    } 

     

    /* 以寫的方式開啟大檔案fout */ 

    fp[k] = fopen(fout, "w");  

     

    /* 利用敗者樹ls將k個輸入歸併段中的記錄歸併到輸出歸併段,即大檔案fout */ 

    K_Merge(ls, b);  

     

    /* 關閉檔案f0,f1,…和檔案fout */ 

    for(i = 0; i <= k; i++){ 

        fclose(fp[i]);  

    } 

     

    /* 以讀的方式重新開啟大檔案fout驗證排序 */ 

    fp[k] = fopen(fout, "r");  

    printf("排序後的大檔案的記錄為:\n"); 

     

    i = 1; 

    do{ 

        /* 將fout的資料讀入r */ 

        j = fscanf(fp[k], "%d ", &r); 

 

        /* 輸出r的內容 */ 

        if(j == 1){ 

            show(r);  

        } 

         

        /* 換行 */ 

        if(i++ % M == 0){ 

            printf("\n");  

        } 

    }while(j == 1); 

    printf("\n"); 

     

    /* 關閉大檔案fout */ 

    fclose(fp[k]);  

    return 0;

} 


測試資料:注意在每個檔案後面都應該加一個哨兵,即一個最大值f0: 10 15 16 100 f1: 9 18 20 100f2: 20 22 40 100out: 9 10 15 16 18 20 20 22 40 100 

參考文獻:

[1] http://baike.baidu.com/view/1368718.htm

[2] http://blog.csdn.net/nomad2/archive/2007/12/15/1940266.aspx