curl多執行緒大批量分片下載大檔案原始碼示例
這段時間,一直在探索使用curl多執行緒來下載一系列的大檔案的可行性方法。下面是我探索的結果:
1.將大檔案分為許多小片段,比如20M一個片段(當然這個值可以配置,比如100M一個片段,取決於你的業務需求),使用http range來下載這些片段;
2.使用預先生成的執行緒池來連續不間斷地執行檔案片段的下載,這個執行緒池也就是一個固定執行緒個數的普通執行緒池,使用互斥鎖和訊號量來保護任務佇列,各worker執行緒搶佔式獲取要下載的任務;
3.檔案的下載使用磁碟檔案形式,當然你也可以修改為儲存在記憶體中。
4.由於easy handle不能跨執行緒呼叫,我們只能是一個執行緒函式裡一個curl easy handle,用完就釋放掉。
5.如果是小檔案(2M以內的,比如html,img之類的),建議使用單執行緒中的curl multi handle的非同步呼叫架構,後續我會將相關程式碼放到網上。
由於一個檔案分為多個分片,每個分片可能被不同的執行緒下載,每個分片完成的次序並不固定,這和Bittorrent有些相似,所以,檔案的關閉是個需要審慎考慮的問題。怎樣處理,才能保證所有的檔案全都及時安全關閉呢?及時的要求是,整個檔案一下載,最好就把它關閉,以免佔用檔案描述符。
在目前demo的設計中,我還沒有找到一個很好的思路來將它們及時關閉。歡迎大牛指點良策。
原始碼示例的功能:
目前的程式碼中,設定了是否啟用http range下載的巨集開關,同時定義了每個分片大小是20M。在開啟range分片下載時,一個63M的檔案,將會分為 63 /20 = 3 + 1 =4個分片。因而線上程池中可能會被4個執行緒下載。而目前執行緒池我設定了30個執行緒。
如果沒有開啟range分片下載,就是一個執行緒下載一個檔案,這個檔案有可能是2M,也有可能是400M,還有可能是2.8G,所以會造成大多數執行緒空閒,個別執行緒累死,這對合理高效利用計算資源是不利的,所以,我提倡儘可能使用range下載大檔案。
下面是原始碼
//g++ -g test_pool.cpp thread_pool.cpp -o test_pool -lpthread -lcurl // #include <unistd.h> #include <libgen.h> #include <curl/curl.h> #include <string> #include <vector> #include <map> #include "thread_pool.h" using namespace std; #define HTTP_RANGE 0 //是否開啟http range下載功能? const long seg_size = 20 * 1024 * 1024; //檔案分片閾值大小是100MB, 超過該大小的檔案將會強制分片下載 static pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER; map<string, int> download_map; //下載任務的完成情況, key是url或是檔名, val是分片數, 用於主執行緒監視 class CloseInfo { public: CloseInfo(FILE* f, string& s): fp(f), url(s) {} ~CloseInfo(){} public: FILE* fp; string url; }; vector<CloseInfo*> closed_vec; //所有開啟的檔案列表需要最後繼中關閉, 因為在分片下載的情況下,無法保證在所有的分片都下載完成的情況下關閉檔案 //這裡必須是struct而不能是class struct JobInfo{ JobInfo(): fp(NULL), startPos(0), stopPos(0), ranged(false) {} FILE* fp; //本地檔案控制代碼 long startPos; long stopPos; string url; bool ranged; //是否需要http range下載? }; static void set_JobInfo(JobInfo* ji, FILE* f, long s, long e, string u, bool r){ ji->fp = f; ji->startPos = s; ji->stopPos = e; ji->url = u; ji->ranged = r; } long get_download_file_length (const char *url) { double file_len = 0; CURL *handle = curl_easy_init (); curl_easy_setopt (handle, CURLOPT_URL, url); curl_easy_setopt (handle, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt (handle, CURLOPT_MAXREDIRS, 3L); curl_easy_setopt (handle, CURLOPT_AUTOREFERER, 1L); curl_easy_setopt (handle, CURLOPT_HEADER, 0L); //只需要header頭 curl_easy_setopt (handle, CURLOPT_NOBODY, 1L); //不需要body curl_easy_setopt (handle, CURLOPT_FORBID_REUSE, 1); curl_easy_setopt (handle, CURLOPT_USERAGENT, "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"); //user-agent if (curl_easy_perform (handle) == CURLE_OK) { curl_easy_getinfo (handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &file_len); } else { file_len = -1; } curl_easy_cleanup(handle); return file_len; } //每個執行緒在下載每個檔案分片時,都會回撥該函式 static size_t write_data(void* ptr, size_t size, size_t nmemb, void* userdata){ JobInfo* ji = (JobInfo*) userdata; bool ranged = ji->ranged; size_t written; //要分片下載的大檔案, 需要設定http range域 if(ranged){ //多執行緒寫同一個檔案, 需要加鎖 pthread_mutex_lock (&g_mutex); if(ji->startPos + size * nmemb <= ji->stopPos){ fseek(ji->fp, ji->startPos, SEEK_SET); written = fwrite(ptr, size, nmemb, ji->fp); ji->startPos += size * nmemb; } else{ fseek(ji->fp, ji->startPos, SEEK_SET); written = fwrite(ptr, 1, ji->stopPos - ji->startPos + 1, ji->fp); ji->startPos = ji->stopPos; } pthread_mutex_unlock (&g_mutex); }else{ written = fwrite(ptr, size, nmemb, ji->fp); } return written; } void* job_process(void* arg){ CURL* curl; CURLcode res; JobInfo* ji = (JobInfo*)arg; char range[64] = { 0 }; if(ji->ranged) { snprintf (range, sizeof(range), "%ld-%ld", ji->startPos, ji->stopPos); printf("range: [%s]\n", range); } curl = curl_easy_init(); curl_easy_setopt(curl, CURLOPT_URL, ji->url.c_str()); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_data); curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void*)ji); curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(curl, CURLOPT_MAXREDIRS, 3L); curl_easy_setopt(curl, CURLOPT_AUTOREFERER, 1L); curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L); curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 1L); curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, 5L); curl_easy_setopt(curl, CURLOPT_USERAGENT, "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"); //user-agent curl_easy_setopt(curl, CURLOPT_FORBID_REUSE, 1L); //curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0L); if(ji->ranged) curl_easy_setopt(curl, CURLOPT_RANGE, range); res = curl_easy_perform(curl); if(CURLE_OK != res){ printf("[%d] %s\n", res, ji->url.c_str()); } curl_easy_cleanup(curl); //TODO: 回饋下載分片的完成情況.... string url = ji->url; pthread_mutex_lock(&g_mutex); download_map[url]--; printf("[-1]%s\n", url.c_str()); pthread_mutex_unlock(&g_mutex); return NULL; } //從url列表得到一個相應的下載任務列表 int get_job_queue (vector<string>& urls, vector<JobInfo*>& jobs){ FILE* fp; long file_len = 0L, start, stop, seg_num; bool ranged = false; JobInfo* ji; vector<string>::iterator it; for(it = urls.begin(); it != urls.end(); ++it){ string url = *it; file_len = get_download_file_length (url.c_str()); if(file_len <= 0) continue; printf("[%ld] %s\n", file_len, url.c_str()); //對應每個url, 開啟一個本地檔案 const char* fn = basename((char*)url.c_str()); string full_path(fn); full_path = "./" + full_path; //設為當前路徑下面 fp = fopen(full_path.c_str(), "wb"); //在此統一開啟檔案 if(NULL == fp) continue; //構造關閉檔案向量 CloseInfo* ci = new CloseInfo(fp, url); closed_vec.push_back(ci); //加入全域性任務監聽對映 int additional = (file_len % seg_size == 0) ? 0 : 1; int seg_total = ranged ? (file_len < seg_size ? 1 : (file_len/seg_size + additional)) : 1; download_map[url] = seg_total; printf("[+1]%s, seg total %d\n", url.c_str(), seg_total); #if HTTP_RANGE //根據檔案大小, 確定是否分片? if(file_len < seg_size){ start = 0; stop = file_len - 1; ji = new JobInfo(); set_JobInfo(ji, fp, start, stop, url, ranged); jobs.push_back(ji); } else{ //分片下載,先確定分片個數 ranged = true; seg_num = (long)file_len / seg_size; printf("filesize[%ld], segsize[%ld], seg num: %ld\n", file_len, seg_size, seg_num); for(int i = 0; i <= seg_num; i++){ if(i < seg_num){ start = i * seg_size; stop = (i + 1) * seg_size - 1; } else{ if(file_len % seg_size != 0){ start = i * seg_size; stop = file_len - 1; }else break; } ji = new JobInfo(); set_JobInfo(ji, fp, start, stop, url, ranged); jobs.push_back(ji); } } #else start = 0; stop = file_len - 1; ji = new JobInfo(); set_JobInfo(ji, fp, start, stop, url, ranged); jobs.push_back(ji); #endif } return 0; } int main(int argc, char* argv[]){ vector<string> urls_vec; urls_vec.push_back("http://dlsw.baidu.com/sw-search-sp/soft/da/17519/BaiduYunGuanjia_4.8.3.1409021519.exe");//10M // urls_vec.push_back("http://cdimage.ubuntu.com/releases/14.04/release/ubuntu-14.04-server-ppc64el.template"); //67M // urls_vec.push_back("http://cdimage.ubuntu.com/releases/14.04/release/ubuntu-14.04.1-server-amd64+mac.template"); //88M urls_vec.push_back("http://cdimage.ubuntu.com/releases/14.04/release/ubuntu-14.04.1-server-powerpc.template"); //64M // urls_vec.push_back("http://cdimage.ubuntu.com/releases/14.04/release/ubuntu-14.04.1-server-ppc64el.template"); //67M // urls_vec.push_back("http://cdimage.ubuntu.com/releases/14.04/release/ubuntu-14.04-desktop-amd64+mac.iso"); //962M urls_vec.push_back("http://cdimage.ubuntu.com/releases/14.04/release/ubuntu-14.04-desktop-amd64+mac.iso.zsync"); //1.9M // urls_vec.push_back("http://cdimage.ubuntu.com/releases/14.04/release/ubuntu-14.04-server-amd64+mac.iso"); //566M // urls_vec.push_back("http://cdimage.ubuntu.com/releases/14.04/release/ubuntu-14.04-server-amd64+mac.template"); //88M // urls_vec.push_back("http://cdimage.ubuntu.com/releases/14.04/release/ubuntu-14.04-server-powerpc.iso"); //654M //建立執行緒越多, cpu佔用率越大, 下載效率會有一定提高 CThreadPool* pool = new CThreadPool(30); vector<JobInfo*> ji_vec; get_job_queue(urls_vec, ji_vec); vector<JobInfo*>::iterator it; for(it = ji_vec.begin(); it != ji_vec.end(); ++it){ JobInfo* ji = *it; pool->pool_add_job(job_process, (void*)ji); } //這個時間僅是一個粗略值,當有執行緒下載檔案未完成時,這個worker執行緒是不會退出的, master執行緒會一直等待, 直到所有的worker執行緒都退出 //此處更好的思路是統計所有的任務是否下載完成, 再準備退出 //usleep(2000 * 1000 * 1000); //2000 map<string, int>::iterator mit; int finished = 0; size_t map_size = download_map.size(); printf("map size %ld\n", map_size); while(1){ finished = 0; pthread_mutex_lock(&g_mutex); for(mit = download_map.begin(); mit != download_map.end(); ++mit) if(!mit->second) finished++; pthread_mutex_unlock(&g_mutex); if(finished == map_size) { printf("finish all task =====================>\n"); break; } else{ usleep(30 * 1000 * 1000); } } download_map.clear(); delete pool; urls_vec.clear(); //注意所含的jobinfo已經線上程處理完後就刪除了,這裡不用再單獨刪除.JobInfo物件析構的時候,會關閉開啟的檔案. //另外執行緒池銷燬時,會將剩下沒有處理的任務刪除 ji_vec.clear(); //開啟的檔案,要記得全部關閉 vector<CloseInfo*>::iterator cit; for(cit = closed_vec.begin(); cit != closed_vec.end(); ++cit){ CloseInfo* ci = *cit; if(ci->fp != NULL){ fclose(ci->fp); ci->fp = NULL; } printf("[closed] %s\n", ci->url.c_str()); delete ci; } closed_vec.clear(); return 0; }
注意需依賴libcurl和lpthread庫, 要先安裝好libcurl庫.另外,固定執行緒池的原始碼參見我另一篇部落格,這裡僅是替換了test_pool.cpp的程式碼, 改為curl下載檔案.
執行截圖
後續改進的地方:
1.檔案關閉方式
2.個別情況下執行緒函式下載超時的問題