1. 程式人生 > >Python多程序處理:如何將大量資料放入有限記憶體

Python多程序處理:如何將大量資料放入有限記憶體

簡介

這是一篇有關如何將大量的資料放入有限的記憶體中的簡略教程。

與客戶工作時,有時會發現他們的資料庫實際上只是一個csv或Excel檔案倉庫,你只能將就著用,經常需要在不更新他們的資料倉庫的情況下完成工作。大部分情況下,如果將這些檔案儲存在一個簡單的資料庫框架中或許更好,但時間可能不允許。這種方法對時間、機器硬體和所處環境都有要求。

下面介紹一個很好的例子:假設有一堆表格(沒有使用Neo4j、MongoDB或其他型別的資料庫,僅僅使用csvs、tsvs等格式儲存的表格),如果將所有表格組合在一起,得到的資料幀太大,無法放入記憶體。所以第一個想法是:將其拆分成不同的部分,逐個儲存。這個方案看起來不錯,但處理起來很慢。除非我們使用多核處理器。

目標

這裡的目標是從所有職位中(大約1萬個),找出相關的的職位。將這些職位與政府給的職位程式碼組合起來。接著將組合的結果與對應的州(行政單位)資訊組合起來。然後用通過word2vec生成的屬性資訊在我們的客戶的管道中增強已有的屬性。

這個任務要求在短時間內完成,誰也不願意等待。想象一下,這就像在不使用標準的關係型資料庫的情況下進行多個表的連線。

資料

職位資料

referencenumbertitlepostdateurlcompanycitystatedescription
1652398203Sales Associate2014-07-09 13:47:18URL linkCompany NameCityStateOur Sales Associates are…

“表格太長,請到原文檢視。”

標題資料

IDTitle
82Pediatricians, General

OES資料

areaarea_titlearea_typenaicsnaics_titleown_code後略…
99U.S.1000000Cross-industry123500-0000

“表格太長,請到原文檢視。”

SOC表

2010 SOC Code2010 SOC Title2010 SOC Direct Match Titlellustrative Example
11-1011Chief ExecutivesCEO

示例指令碼

下面的是一個示例指令碼,展示瞭如何使用multiprocessing來在有限的記憶體空間中加速操作過程。指令碼的第一部分是和特定任務相關的,可以自由跳過。請著重關注第二部分,這裡側重的是multiprocessing引擎。

Python
1234567891011121314151617181920212223242526272829303132333435363738394041424344#import the necessary packagesimportpandas aspdimportusimportnumpy asnpfrommultiprocessingimportPool,cpu_count,Queue,Manager# the data in one particular column was number in the form that horrible excel version # of a number where '12000' is '12,000' with that beautiful useless comma in there. # did I mention I excel bothers me?# instead of converting the number right away, we only convert them when we need todefmedian_maker(column):returnnp.median([int(x.replace(',',''))forxincolumn])# dictionary_of_dataframes contains a dataframe with information for each title; e.g title is 'Data Scientist'# related_title_score_df is the dataframe of information for the title; columns = ['title','score'] ### where title is a similar_title and score is how closely the two are related, e.g. 'Data Analyst', 0.871# code_title_df contains columns ['code','title']# oes_data_df is a HUGE dataframe with all of the Bureau of Labor Statistics(BLS) data for a given time period (YAY FREE DATA, BOO BAD CENSUS DATA!)defjob_title_location_matcher(title,location):try:related_title_score_df=dictionary_of_dataframes[title]# we limit dataframe1 to only those related_titles that are above # a previously established thresholdrelated_title_score_df=related_title_score_df[title_score_df['score']>80]#we merge the related titles with another table and its codescodes_relTitles_scores=pd.merge(code_title_df,related_title_score_df)codes_relTitles_scores=codes_relTitles_scores.drop_duplicates()# merge the two dataframes by the codesmerged_df=pd.merge(codes_relTitles_scores,oes_data_df)#limit the BLS data to the state we wantall_merged=merged_df[merged_df['area_title']==str(us.states.lookup(location).name)]#calculate some summary statistics for the time we wantgroup_med_emp,group_mean,group_pct10,group_pct25,group_median,group_pct75,group_pct90=all_merged[['tot_emp','a_mean','a_pct10','a_pct25','a_median','a_pct75','a_pct90']].apply(median_maker)row=[title,location,group_med_emp,group_mean,group_pct10,group_pct25,group_median,group_pct75,group_pct90]#convert it all to strings so we can combine them all when writing to filerow_string=[str(x)forxinrow]returnrow_stringexcept:# if it doesnt work for a particular title/state just throw it out, there are enough to make this insignificant'do nothing'

這裡發生了神奇的事情:

Python
1234567891011121314151617181920212223242526272829303132333435363738394041#runs the function and puts the answers in the queuedefworker(row,q):ans=job_title_location_matcher(row[0],row[