1. 程式人生 > >shmget 共享記憶體 同步讀寫檔案一個程序寫,多個程序讀,讀和寫同步,邊寫邊讀

shmget 共享記憶體 同步讀寫檔案一個程序寫,多個程序讀,讀和寫同步,邊寫邊讀

首先,看看老大給我的任務:實現一個模組間的記憶體管理庫, 實現以下功能

1、該記憶體庫通訊的資料量不確定, 最大5Mbit/s 
2、該記憶體庫用於模組間的資料互動
3、該記憶體庫只允許一個模組寫入, 但可多個模組讀取, 但需要各個讀取模組沒有任何相互干擾, 比如一個模組可能讀取的速度較慢, 一個讀取的速度較快
4、該記憶體庫需要具有一定的資料緩衝, 實際的應用上, 用於視訊幀資料的互動,  
5、封裝成動態庫 我拿到題目後,第一想法是,這很簡單,兩模組雖然不能同時寫,同時讀還是很容易的,接下來就傻了。。。老大要求的是實現同時讀和寫,並且多個讀不影響。

遇到的問題:

1,程序間最好用的通訊方式是共享記憶體,記憶體大且操作容易。

2,資料量不確定是寫入時的檔案大小不確定,那麼怎麼來回改變共享記憶體大小。(可變陣列行不通)

3,多個程序讀,多次建立掛載共享記憶體,銷燬時會出現問題。

4,讀寫不同步,寫完之後讀不是想要的結果。

5,重複性的問題,避免重複讀。

下面詳細介紹我的痛苦歷程:

首先,我們來說說共享記憶體,共享記憶體是程序間通訊的一種很好用的方式,記憶體足夠大,建立銷燬簡單,在前面已經介紹過了,關於建立的函式見連結 http://blog.csdn.net/agoni_xiao/article/details/77165428

接下來,處理檔案大小不穩定的情況,用get_blocksize函式,根據檔案大小改變塊的大小及個數。


我本來想用可變陣列後來發現每次都讀不完整,是因為可變陣列是從地址訪問到地址,如果別的程式用到你的地址,就會出現錯誤。

然後,想使用緩衝區環形和整型,但如果這樣每次都需要開闢的記憶體是把整個檔案寫入共享記憶體,而不是一部分一部分去寫,如果寫一部分,第一個模組讀完並載入新的資料,第二個模組就會讀不到之前的資料,或者讀到部分資料,所以放棄緩衝區的想法。環形緩衝區和整型緩衝區知識http://www.cnblogs.com/zengzy/p/5139582.html

對於讀寫的同步,寫在前,讀在後,每個塊都有一個狀態標誌block_status,為1時表示該塊已寫入完成可讀,為2時表示已讀過。當第一個模組讀完第一個塊之後,繼續讀第二個塊,當讀到第二個塊的時候,把第一個塊狀態置為1,以便後面的模組讀取。圖見下:


解決共享記憶體多次釋放問題,是通過共享記憶體結構體中,儲存建立好的資訊,在下一塊連結時,讀取並判斷。

廢話少說,直接上程式碼.

標頭檔案shm.h:

#include<stdlib.h>
#include<sys/shm.h>
#include<unistd.h>
#include<string.h>
#include<time.h>
#ifndef _SHAREMRY_H_HEADER
#define _SHAREMRY_H_HEADER
#define SIZE 1024*1024
struct share_temp
{
	int read_success;
	int size;
	int block_status[10];
	int is_readed[10];
	int is_empty;
	char text[SIZE];
	int shmid_line;
	void *shm_line;
};

void* rmapi_create_sharememory();
//int get_blockcount(int size,int bsize);
int get_blocksize(int size);
void init_is_readed(struct share_temp **shared);
int rmapi_put_frame(void* memory, const char* data, unsigned int data_size);
int rmapi_get_frame(void* memory,char** data,unsigned int data_size);
void read_file(int size, struct share_temp **shared,char** filedata);
void rmapi_destroy_read_sharememory(void* memory);
void rmapi_destroy_write_sharememory(void* memory);

#endif
讀模組readshm.cpp
#include<iostream>
#include<stdio.h>
#include"shm.h"

using namespace std;
class ReadBlock
{
	public:
		ReadBlock(){}
		~ReadBlock(){}

		void* creatershm()
		{
			shm_line = rmapi_create_sharememory();
			return shm_line;
		}
		//void File_opt();
		
	private:
		//struct share_temp sw;
		void* shm_line;
		
};

int main()
{
	ReadBlock rb;
	int size,shmid_line;
	
	//char* fdata = (char*)malloc(size);
	void* readshm = rb.creatershm();
	struct share_temp *spq =(struct share_temp *)readshm;
	size = spq->size;
	char* fdata = (char*)malloc(size);
	rmapi_get_frame(readshm, &fdata, size);
    rmapi_destroy_write_sharememory(readshm);
}
寫模組writeshm.cpp
#include<iostream>
#include<stdio.h>
#include"shm.h"
using namespace std;
void *rmapi_create_sharememory();
class WriteBlock
{
	public:
		WriteBlock(){}
		~WriteBlock(){}
		
		void *createwshm()
		{
			shm_line = rmapi_create_sharememory();
			return shm_line;
		}
		
		
	private:
		//struct share_temp sw;
		void* shm_line;
		
};

int main(int argc,char *argv[])
{
	WriteBlock wb;
	int count,size,bsize,shmid_line;
	
	FILE *fr;    		
	fr = fopen(argv[1],"a+");
	fseek(fr,0,SEEK_END);
	size = ftell(fr);
	rewind(fr);
	
	void *wshmline = wb.createwshm();
	struct share_temp* write_shm = (struct share_temp *)wshmline;
	char* fdata = (char *)malloc(size);
	fread(fdata,size,1,fr);
	cout<<"the fread fileinfo is"<<fdata<<endl;
	
//	rmapi_put_frame(wshmline, fdata, size);
	
	write_shm->size = size;
	cout<<"the wshmline->size:"<<write_shm->size<<endl;
	rmapi_put_frame(wshmline,fdata,size);
	rmapi_destroy_write_sharememory(wshmline);
	fclose(fr);
}
方法庫sharemrylib.cpp
#include<iostream>
#include<stdlib.h>
#include<unistd.h>
#include<stdio.h>
#include"shm.h"
using namespace std;
void init_is_readed(void **memory)
{
	void *rst = *memory;
	struct share_temp *shared = (struct share_temp *)rst;
	int rf = 0;
	for(rf;rf<10;rf++)
	{
		shared->is_readed[rf] = 0;
		cout<<"init_is_readed"<<rf<<"is ok"<<endl;
	}
	shared = NULL;

}
int get_blocksize(int size)
{
	int bs;
	if(size > 1024*1024)
	{
		bs = 1024*1024;
	}
	if(size > 1024*512 && size < 1024*1024)
	{
		bs = 1024*8;
	}
	if(size >1024 && size < 1024*512)
	{
		bs = 1024;
	}
	if(size < 1024)
	{
		bs = 1024;
	}
	return bs;
}
int get_blockcount(int size,int bsize)
{	
	int count = size/bsize;
	cout<<"the block count is:"<<count<<endl;
	return count;
}
void read_file(int size, struct share_temp **shared,char** filedata)
{	
	char* readdata = *filedata;
	struct share_temp *pos;
	char *readp,*tpbuf;
	int count,remind;
	int di = 0;
	pos = *shared;
	readp = pos->text;
	int bsize = get_blocksize(size);
	cout<<"the bsize is "<<bsize<<endl;
	count = get_blockcount(size,bsize);
	cout<<"the count is"<<count<<endl;
	tpbuf = (char*)malloc(bsize);
	while(di <= count)
	{
		if(pos->block_status[di] > 0)
		{
			switch (pos->is_readed[di])
			{
				case 1:
					sleep(5);
					//cout<<"the  "<<di<<"  block text info is"<<readp<<endl;
					memcpy(tpbuf,readp,bsize);
                
					cout<<"the  "<<di<<"  block text info is"<<tpbuf<<endl;
					memcpy(readdata,tpbuf,bsize);
					memset(tpbuf,0,bsize);
					pos->is_readed[di] = 2;
					pos->is_readed[di-1] = 1;
					readp =readp + bsize;
					di += 1;
					break;
				case 2:
					cout<<"the"<<di<<"block is readed"<<endl;
					pos->is_readed[di] = 1;
					di += 1;
			}
		}
		else
		{
			cout<<"wait the "<<di<<"  write info"<<endl;
			di += 0;
		}
	}	
	cout<<"the di is "<<di<<endl;	
	if(pos->block_status[di] == 1 && pos->is_readed[di] == 1)
	{
		cout<<"the remind info is"<<readp<<endl;
		pos->is_readed[di] = 2;
		memcpy(readdata,readp,remind);
	}
	pos = NULL;
	readp = NULL;
	readdata = NULL;
}

void* rmapi_create_sharememory()
{
	void *shm = NULL;
	int shmid = shmget((key_t)1342, SIZE, 0666|IPC_CREAT);
	if(shmid == -1)
	{		
		cout<<"shmid failed"<<endl;
	}
	shm = shmat(shmid, (void *)0, 0);
	struct share_temp *shared = (struct share_temp*)shm;
	shared->shmid_line = shmid;
	shared->shm_line = shm;
	//framecount = shmid; 
	cout<<"memory attach at "<<(int)shm<<endl;
	if(shm == (void*)-1)
	{
		cout<<"shmat failed"<<endl;
	}
	return shm;
}



int rmapi_put_frame(void* memory, const char* data, unsigned int data_size)
{
	
	struct share_temp* shared = (struct share_temp*)memory;
	
	struct share_temp *post = shared;
	char *dost = post->text;
	int bsize,count;
	int ri = 0;
	if(data_size>1024)
	{
	bsize = get_blocksize(data_size);
	
	count = data_size / bsize;
	int remind = (data_size - count * bsize);
	cout<<"the remind size is:"<<remind<<endl;
	post->is_empty = 1;

	for(ri;ri<count;ri++)
	{
		cout<<"begin write the"<<ri<<"block"<<endl;
		
		memcpy(dost,data,bsize);
		cout<<"block"<<ri<<"the dost is"<<dost<<endl;
		
		post->block_status[ri] = 1;
		post->is_readed[ri] = 1;

		sleep(3);
		dost = dost + bsize;
		data = data + bsize;
		//free(tpbuf);
	}	
	memcpy(dost,data,remind);
	
	cout<<"the remind info is "<<data<<endl;
	cout<<"the remind size is "<<remind<<endl;
	post->block_status[ri] = 1;	
	post->is_readed[ri] = 1;
	}
	else
	{
		memcpy(dost,data,data_size);
		post->block_status[ri] = 1;
	}
	post = NULL;
	dost = NULL;
}

int rmapi_get_frame(void* memory, char** data, unsigned int data_size)
{
	
	struct share_temp* shared = (struct share_temp*)memory;
	
	struct share_temp *post = shared;
	char* filedata = *data;
	int running = 1;	
	while(running)
	{
		if(shared->is_empty == 0)
		{
			cout<<"have read nothing,the shm is empty"<<endl;
			sleep(5);
			continue;
		}
		read_file(data_size,&shared,&filedata);
		
		running = 0;
	}
}

void rmapi_destroy_read_sharememory(void* memory)
{
	struct share_temp* shared = (struct share_temp*)memory;
	int shmid = shared->shmid_line;
	void *shm = shared->shm_line;
	
	if(shmdt(shm) == -1)
	{
		cout<<"shmdt failed"<<endl;
		exit(EXIT_FAILURE);	
	}
	cout<<"shmdt success"<<endl;
	if(shmctl(shmid,IPC_RMID,0) == -1)	
	{			
		cout<<"shmctl failed"<<endl;
		exit(EXIT_FAILURE);	
	}
	cout<<"shmctl success"<<endl;
}
void rmapi_destroy_write_sharememory(void* memory)
{
	struct share_temp* shared = (struct share_temp*)memory;
	int shmid = shared->shmid_line;
	void *shm = shared->shm_line;
	
	if(shmdt(shm) == -1)
	{
		cout<<"shmdt failed"<<endl;
		exit(EXIT_FAILURE);	
	}
	cout<<"shmdt success"<<endl;
}

makefile檔案:

CPP = g++
CPPFLAGS = -fpic
LIB = libsharemry.so
HPATH = /home/xudong/share_memory/
LIBPATH = /home/xudong/share_memory/


edit:sharemry.o libsharemry.so readshm.o writeshm.o readshm readshm2 writeshm 


readshm:
$(CPP) -o readshm readshm.cpp -I $(HPATH) -L $(LIBPATH) -lsharemry 


readshm2:
$(CPP) -o readshm2 readshm2.cpp -I $(HPATH) -L $(LIBPATH) -lsharemry 
writeshm:
$(CPP) -o writeshm writeshm.cpp -I $(HPATH) -L $(LIBPATH) -lsharemry
libsharemry.so:
$(CPP) -shared -o $(LIB) sharemrylib.o 

sharemry.o:
$(CPP) $(CPPFLAGS) -c sharemrylib.cpp

以上就是所有的原始碼。