1. 程式人生 > >多執行緒技術模擬平行計算之二:陣列字首和(Prefix Sum)

多執行緒技術模擬平行計算之二:陣列字首和(Prefix Sum)

一、字首和(Prefix Sum)定義:

給定一個數組A[1..n],字首和陣列PrefixSum[1..n]定義為:PrefixSum[i] = A[0]+A[1]+...+A[i-1];

例如:A[5,6,7,8] --> PrefixSum[5,11,18,26]

PrefixSum[0] =A[0] ;

PrefixSum[1] =A[0] + A[1] ; 

PrefixSum[2] =A[0] + A[1] + A[2] ;

PrefixSum[3] =A[0] + A[1] + A[2] + A[3] ;

二、序列程式(Sequential Program):

時間複雜度:O(n);空間複雜度:O(n)。n是陣列A的長度。

虛擬碼:

Procedure PS(A,n,S)
    S[0] = A[0];
    if n = 1 then exit;
    else { for j = 1 to n do S[j] = S[j-1] +A[j] }</span>


序列程式碼sequence.c:

#include<stdio.h>
#include<time.h>
#include<stdlib.h>
#include<unistd.h>
#include<memory.h>
 
#define n 1000
#define RANGE 2
 
int A[n]={0};
intprefix_sum[n]={0};
int num;
 
void work()
{
    int i ;
    prefix_sum[0]=A[0];
    for(i=1;i<num;i++)
    {
        prefix_sum[i] = prefix_sum[i-1]+A[i];
    }
}
 
void generate()
{
    FILE *file;
    if( (file=fopen("/home/huawei/leaguenew/parallel_computing/A.txt","wt"))==NULL )
    {
        perror("open fail\n");
        exit(1);
    }
    int i;
    srand((unsigned)time(NULL)) ;
    for( i =0 ; i < num ; i++ )
       fprintf(file,"%-4d",rand()%RANGE);
    //fprintf(file,"\n");
    fclose(file);
}
 
 
void Read()
{
    FILE *file;
    if( (file =fopen("/home/huawei/leaguenew/parallel_computing/A.txt","rt"))==NULL )
    {
        perror("open fail\n");
        exit(1);
    }
    int i;
    srand((unsigned)time(NULL));
    for(i=0;i<num;i++)
        fscanf(file,"%d",&A[i]);
    fclose(file);
}
 
int main()
{
    printf("please input the scale ofinput :  ");
    scanf("%d",&num);
    generate();
    Read();
    int h;
 
    clock_t start = clock();
    work();
    clock_t finish = clock();
    printf("the time cost is : %.6fseconds\n",(double)(finish-start)/CLOCKS_PER_SEC);
    
    for(h=0;h<num;h++)
        printf("%d ",A[h]);
    printf("\nThe prefix sum is :\n");
    for(h=0;h<num;h++)
        printf("%d ",prefix_sum[h]);
    printf("\n");
    return 0;
}

程式碼中的檔案路徑需要自行修改。

遞迴實現:

#include<iostream>
using namespace std;
 
int ps[10];
 
int prefix_sum(inta[] ,int n)
{
    if( n == 0 )
    {
        ps[0] = a[0];
        return ps[0];
    }
    ps[n] =  a[n]+prefix_sum(a,n-1);  
    return ps[n];
}
 
int main()
{
    int a[] = {3,1,2,5,7,4,9};   
    int len = sizeof(a)/sizeof(a[0]);
    prefix_sum(a,len-1) ;
    
    for(int i=0;i<len;i++)
    {
        cout<<ps[i]<<"";       
    }
    cout<<endl;
    system("pause");
    return 0;
}


三、並行演算法

3.1.Non RecursivePrefix_Sum

首先讓A[1..n]來表示n個整數的陣列。讓B[h][j]作為輔助資料在二叉樹的前向遍歷中儲存中間資訊(後面會說明方法),其中1<=h<=log2(n) and 1<=j<=n/2^h,然而陣列C被用作後向遍歷樹下存放結果。

Input:大小為n=2^h的一個數組A,h是一個非負整數

Output:矩陣C存放結果,C[0][j]就是字首和的第j項,1<=j<=n

虛擬碼:

begin
for 1<=j<=n <strong>pardo</strong>
    set B[0,j] = A[j];
 
for h = 1 to log2(n) do
    for 1<=j<=n/2^h <strong>pardo</strong>
        set B[h,j] = B[h-1,2*j-1] + B[h-1,2*j];
 
for h = log2(n) to 0 do
    for 1<=j<n/2^h <strong>pardo</strong>
    {
        j is even : set C[h,j] = C[h+1,j/2];
        j = 1     : set C[h,1] = B[h,1];
        j is odd  : set C[h,j] = C[h+1,(j-1)/2] + B[h,j];
    }
end
說明:虛擬碼中的pardo是parallel do的意思,即並行處理。


                                         Data Flow(Forward)

執行時間:並行的執行時間就是O(log2(n))其實也就是樹的高度,每一層並行一次時間複雜度為O(1)。

注意:這是一個前向遍歷,從底而上。對於C的後向遍歷同樣的,區別在於從上而下。

 

                                           DataFlow(Backward)

陣列C的生成根據上述虛擬碼生成,最終的C[0][j](1<=j<=n)就是最後的字首和的結果。

上程式碼:

#include<stdio.h>
#include<time.h>
#include<pthread.h>
#include<stdlib.h>
#include<unistd.h>
#include<memory.h>
#include<stdlib.h>
#include<math.h>

#define RANGE 99
#define M 1000

void generate(int n);
void Read(int n);
void *func1(void *arg);
void *func2(void *arg);
void *func3(void *arg);
void calculate();
void resultToFile();

int A[M];
int B[20][1000];
int prefix_sum[20][1000]={0};
pthread_t tids[20][1000];
pthread_mutex_t mutex;
int n;

struct S
{
	int i ;
	int j ;
};

int main()
{
	printf("please input the scale of input :  ");
	scanf("%d",&n);
	generate(n);
	Read(n);

	clock_t  start = clock();
	calculate();
	resultToFile();
	clock_t finish = clock();

	printf("the time cost is : %.6f seconds\n",(double)(finish-start)/CLOCKS_PER_SEC);
	return 0;
}


void calculate()
{
	int h,i,j,k,l,flag;
	/*======set B[0][j]=A[j] in parallel==================*/
	for(i=1;i<=n;i++)
	{
		struct S *s;
		s = (struct S *)malloc(sizeof(struct S));
		s->i = i;
		if( pthread_create(&tids[0][i],NULL,func1,s) )
		{
			perror("Fail to create thread!");			
			exit(1);
		}
	}
	for(j=1;j<=n;j++)
		pthread_join(tids[0][j],NULL);

	/***Show the B[0][j]***************************/
#if 1
	printf("Show the A[i]:   ");
	for(j=1;j<=n;j++)
		printf("%d ",A[j]);
	printf("\nShow the B[0][j]:");
	for(j=1;j<=n;j++)
		printf("%d ",B[0][j]);
	printf("\n");
#endif
	/***********************************************/

	/**Data Flow Forward Algorithm:build the tree***/
	for(h=1;h<=log2(n);h++)
	{
		for(j=1;j<=n/pow(2,h);j++)
		{
			struct S *s;
			s = (struct S*)malloc(sizeof(struct S));
			s->i = h;
			s->j = j;
			if( pthread_create(&tids[h][j],NULL,func2,s) )
			{
				perror("sth is wrong");
				exit(1);
			}
		}
		/*****wait all the threads terminate to continue*****/
		for(j=1;j<=n/pow(2,h);j++)
			pthread_join(tids[h][j],NULL);
	}
	
#if 1
	/******Print Data Flow Forward************/
	printf("Data Flow Forward:\n");
	for(h=0;h<=log2(n);h++)
	{
		for(l=1;l<=2*h+1;l++)
			printf(" ");
		for(j=1;j<=n/pow(2,h);j++)
		{
			printf("%d",B[h][j]);
			for(l=1;l<=1+h;l++)
				printf(" ");
		}
		printf("\n");
	}
#endif
	
	/*********************************************/

	/***********Data Flow Backward:Traverse*******/
	for(h=log2(n);h>=0;h--)	
	{
		for(j=1;j<=n/pow(2,h);j++)
		{
			struct S *s;
			s = (struct S*)malloc(sizeof(struct S));
			s->i = h;
			s->j = j;
			if( pthread_create(&tids[h][j],NULL,func3,s) )
			{
			perror("sth is wrong");
			exit(1);
			}	
		}
		for(j=1;j<=n/pow(2,h);j++)
			pthread_join(tids[h][j],NULL);
	}

#if 1
	/********Print Data Flow Backward************/
	printf("Data Flow Backward:\n");
	for(h=log2(n);h>=0;h--)	
	{
		for(l=1;l<=2*h+1;l++)
			printf(" ");
		for(j=1;j<=n/pow(2,h);j++)
		{
			printf("%d ",prefix_sum[h][j]);
			for(l=1;l<=h;l++)
				printf(" ");
		}	
		printf("\n");
	}
#endif
	/*********************************************/
}

void resultToFile()
{
	int i;
	/***********write the result to file**********/		
	FILE *file	;
	file = fopen("/home/leaguenew/parallel_computing/prefixSumResult.txt","wt");
	for(i=1;i<=n;i++)
	{
		fprintf(file,"%-6d",prefix_sum[0][i]);
	}
	/*********************************************/		

}

void *func1(void *arg)//set B[0][j]=A[j]
{
	int i;
	struct S *p;
	p = (struct S*)arg;
	i = p->i;
	pthread_mutex_lock(&mutex);
	B[0][i]=A[i];
	pthread_mutex_unlock(&mutex);
	pthread_exit(NULL);
}

void *func2(void *args)//B[h][j]=B[h-1][2*j-1]+B[h-1][2*j];
{
	int h , j;
	struct S *p;
	p = (struct S*)args;
	h = p->i;
	j = p->j;
	pthread_mutex_lock(&mutex);
	B[h][j]=B[h-1][2*j-1]+B[h-1][2*j];
	pthread_mutex_unlock(&mutex);
	pthread_exit(NULL);
}

void *func3(void *arg)//get the prefix sum
{
	int h,j;
	struct S *p;
	p = (struct S*)arg;
	h = p->i;
	j = p->j;
	pthread_mutex_lock(&mutex);
	if(j==1) prefix_sum[h][1] = B[h][1];
	else if (j%2==0)  prefix_sum[h][j] = prefix_sum[h+1][j/2] ;
	else prefix_sum[h][j] = prefix_sum[h+1][(j-1)/2] + B[h][j] ;
	pthread_mutex_unlock(&mutex);
	pthread_exit(NULL);
}

void generate(int n)
{
	FILE *file1;
	if( (file1=fopen("/home/leaguenew/parallel_computing/arrayA.txt","wt") )==NULL )
	{
		perror("fopen");
		exit(1);
	}
	
	int i,j;
	srand( (unsigned)time(NULL) );
	for( i = 1 ; i <= n ;i++)
		fprintf(file1,"%-8d",rand()%RANGE);
	fprintf(file1,"\n");
	fclose(file1);
}

void Read(int n)
{
	FILE *file1;
	if( (file1=fopen("/home/leaguenew/parallel_computing/arrayA.txt","rt") )==NULL )
	{
		perror("fopen");
		exit(1);
	}
	int i,j;
	srand( (unsigned)time(NULL) );
	for( i = 1 ; i <= n ;i++)
		fscanf(file1,"%d",&A[i]);
	fclose(file1);
}

程式的流程是:

Main()
{
       Generate();//生成隨機資料
       Read();//讀取隨機資料到陣列A中
       Calculate()
       {    
              Pthread_create(func1);
              Pthread_create(func2);
              Pthread_create(func3);
       }
       ResultToFile();//把結果寫入檔案
}
Func1();//計算:B[0][j] = A[j]
Func2();//計算:B[h][j]=B[h-1][2*j-1]+B[h-1][2*j]
Func3();//計算:C[i][j]

描述:首先通過隨機數生成一組隨機數,存放到檔案中去,然後通過讀取檔案中的資料到陣列A中。然後開始計時,呼叫計算函式calculate,建立多執行緒計算:

<span style="font-family:Microsoft YaHei;font-size:14px;">B[0][j] = A[j];
B[h][j]=B[h-1][2*j-1]+B[h-1][2*j];
{    
     if(j==1) prefix_sum[h][1]= B[h][1];
     elseif (j%2==0)  prefix_sum[h][j] =prefix_sum[h+1][j/2] ;
     elseprefix_sum[h][j] = prefix_sum[h+1][(j-1)/2] + B[h][j] ;
}</span>

最後輸出結果到檔案。


結果截圖:

n=2:


n=4:


n=8:


n=16:


n=500:


n=600:


n=800:


3.2.Get rid of the 2-D array

這是計算prefix sum的另外一種演算法,去除二維陣列。

虛擬碼:

in:A[1..n],out:Prefix[1..n]

for 1<=i<=n do in parallel
    Prefix[i] = A[i-1] + A[i];
end in parallel
k = 2:
while(k<n) do
    for k+1<=i<=n do in parallel
        Prefix[i] = Prefix[i-k] + Prefix[i]
    end in parallel
    k = k + k;
end while 
程式碼:
#include<stdio.h>
#include<time.h>
#include<pthread.h>
#include<stdlib.h>
#include<unistd.h>
#include<memory.h>
#include<stdlib.h>
#include<math.h>

#define RANGE 99
#define M 1000

void generate();
void Read();
void *func1(void *arg);
void *func2(void *arg);
void calculate();
void resultToFile();

int A[M]={0};
int prefix_sum[M]={0};
int temp[M]={0};
int n;
pthread_t tids[1000][1000];
pthread_mutex_t mutex;

struct S
{
	int i ;
	int j ;
};

int main()
{
	printf("please input the scale of input : ");
	scanf("%d",&n);
	generate();
	Read();

	clock_t start = clock();
	calculate();
	resultToFile();
	clock_t finish = clock();
	
	printf("The time cost is : %.6f second\n",(double)(finish-start)/CLOCKS_PER_SEC);
	return 0 ;
}

void resultToFile()
{
	int i;
	/***********write the result to file**************/
	FILE *file;
	file = fopen("/home/leaguenew/parallel_computing/GetRidOf2DArray.txt","wt");
	for(i=1;i<=n;i++)
	{
		fprintf(file,"%-6d",prefix_sum[i]);
	}
	/*************************************************/
}

void calculate()
{
	int h,i,j,k,l ;
	/************************************************/
	for(i=1;i<=n;i++)
	{
		struct S* s;
		s = (struct S*)malloc(sizeof(struct S));
		s->i = i;
		if( pthread_create(&tids[0][i],NULL,func1,s))
		{
			perror("Fail to create thread!");
			exit(1);
		}
	}
	for(j=1;j<=n;j++)
		pthread_join(tids[0][j],NULL);
	/************************************************/

	k=2;
	while(k<n)
	{
		for(i=1;i<=n;i++)
			temp[i] = prefix_sum[i];
		for(i=k+1;i<=n;i++)
		{
			struct S* s;
			s = (struct S*)malloc(sizeof(struct S));
			s->i = i;
			s->j = k;
			if( pthread_create(&tids[k][i],NULL,func2,s))
			{
				perror("Fail to create thread!");
				exit(1);
			}
		}
		for(j=k+1;j<=n;j++)
			pthread_join(tids[k][j],NULL);

		for(i=1;i<=n;i++)
			prefix_sum[i] = temp[i];
		k = k + k;
	}

#if 0
	/************show the result***************/
	printf("A: \n");
	for(i=1;i<=n;i++)
		printf("%d ",A[i]);
	printf("\nprefix_sum:\n");
	for(i=1;i<=n;i++)
		printf("%d ",prefix_sum[i]);
	printf("\n");
	/******************************************/
#endif
	
}

void *func1(void *arg)
{
	int i ;
	struct S *p;
	p = (struct S*)arg;
	i = p->i;
	pthread_mutex_lock(&mutex);
	if(i==1)  prefix_sum[i] = A[i];
	else	  prefix_sum[i] = A[i-1] + A[i];
	pthread_mutex_unlock(&mutex);
	pthread_exit(NULL);
}

void *func2(void *arg)
{
	int i ,k ;
	struct S *p;
	p = (struct S*)arg;
	i = p->i;
	k = p->j;
	pthread_mutex_lock(&mutex);
	temp[i] = prefix_sum[i-k] + prefix_sum[i];
	pthread_mutex_unlock(&mutex);
	pthread_exit(NULL);
}


void generate()
{
	FILE *file;
	if( (file = fopen("/home/leaguenew/parallel_computing/arrayB.txt","wt"))==NULL )
	{
		perror("fopen");
		exit(1);
	}
	int i , j ;
	srand((unsigned)time(NULL) );
	for( i=1;i<=n;i++ )
		fprintf(file,"%-6d",rand()%RANGE);
	fprintf(file , "\n");
	fclose(file);
}

void Read()
{
	FILE *file;
	if( (file = fopen("/home/leaguenew/parallel_computing/arrayB.txt","rt"))==NULL )
	{
		perror("fopen");
		exit(1);
	}
	int i , j ;
	srand((unsigned)time(NULL) );
	for( i=1;i<=n;i++ )
		fscanf(file,"%d",&A[i]);
	fclose(file);
}

程式的流程是:
Main()
{
	Generate();//生成隨機資料寫入檔案
	Read();//讀取檔案中的資料到陣列中
	Calculate()
{	
Pthread_create(func1);
		Pthread_create(func2);
}
	ResultToFile();//把結果寫入檔案
}
Func1();//計算:prefix_sum[i] = A[i-1] + A[i];
Func2();//計算:prefix[i] = prefix_sum[i-k] + prefix_sum[i];<strong>

結果截圖:

n=4:


n=6:

n=16:



四、總結

a.pthread的連結庫問題:編譯的時候要新增–lpthread。

b.Scanf引數沒有新增&造成段錯誤。

c.通過列印除錯除錯程式,及列印相關的輸出語句來判斷哪個位置發生錯誤。

d.sleep()不佔計算clock()時間。

f.在UNIX和LINUX系統中在要命令中加入-lm這個功能是把數學庫與源程式連結在一起。

g.Pthread_join的理解,等待所有建立的執行緒結束後繼續進行。

h.通過動態建立結構體來給pthread_create()傳遞多個引數。