1. 程式人生 > >python 處理大資料之資料讀取

python 處理大資料之資料讀取

1 參考1:python讀取GB級的文字資料,防止memoryError

Preliminary
  我們談到“文字處理”時,我們通常是指處理的內容。Python 將文字檔案的內容讀入可以操作的字串變數非常容易。檔案物件提供了三個“讀”方法: .read()、.readline() 和 .readlines()。每種方法可以接受一個變數以限制每次讀取的資料量,但它們通常不使用變數。 .read() 每次讀取整個檔案,它通常用於將檔案內容放到一個字串變數中。然而 .read() 生成檔案內容最直接的字串表示,但對於連續的面向行的處理,它卻是不必要的,並且如果檔案大於可用記憶體,則不可能實現這種處理。下面是read()方法示例:


try:
f = open('/path/to/file', 'r')
print f.read()
finally:
if f:
f.close()
  呼叫read()會一次性讀取檔案的全部內容,如果檔案有10G,記憶體就爆了,所以,要保險起見,可以反覆呼叫read(size)方法,每次最多讀取size個位元組的內容。另外,呼叫readline()可以每次讀取一行內容,呼叫readlines()一次讀取所有內容並按行返回list。因此,要根據需要決定怎麼呼叫。
  如果檔案很小,read()一次性讀取最方便;如果不能確定檔案大小,反覆呼叫read(size)比較保險;如果是配置檔案,呼叫readlines()最方便:


for line in f.readlines():
process(line) # <do something with line>
Read In Chunks
  處理大檔案是很容易想到的就是將大檔案分割成若干小檔案處理,處理完每個小檔案後釋放該部分記憶體。這裡用了iter & yield:


def read_in_chunks(filePath, chunk_size=1024*1024):
"""
Lazy function (generator) to read a file piece by piece.
Default chunk size: 1M
You can set your own chunk size
"""
file_object = open(filePath)
while True:
chunk_data = file_object.read(chunk_size)
if not chunk_data:
break
yield chunk_data
if __name__ == "__main__":
filePath = './path/filename'
for chunk in read_in_chunks(filePath):
process(chunk) # <do something with chunk>
Using with open()
  with語句開啟和關閉檔案,包括丟擲一個內部塊異常。for line in f檔案物件f視為一個迭代器,會自動的採用緩衝IO和記憶體管理,所以你不必擔心大檔案。
#If the file is line based
with open(...) as f:
for line in f:
process(line) # <do something with line>
Conclusion
  在使用python進行大檔案讀取時,應該讓系統來處理,使用最簡單的方式,交給直譯器,就管好自己的工作就行了。

參考2:利用python處理兩千萬條資料的一些經驗

先開啟目標檔案,寫入列名,再開啟原始檔案,按行讀取,按照條件判斷這一行是否為“髒”資料,不是的話再按照上面表格裡的要求進行處理,之後按行寫入目標檔案,這樣一來,電腦記憶體佔用率下降,電腦也就不會卡了,最後再將初步處理過的檔案利用pandas開啟,利用其中的DataFrame資料結構的方法進行去重,兩千萬條的資料五分鐘之內處理完成,以下為原始碼:

import csv
rows=[]
with open(r'C:\Users\Hanju\Desktop\uploadPortal(5).csv',"w", newline='') as _csvfile: 
    writer = csv.writer(_csvfile)
    #先寫入columns_name
    writer.writerow(["Dev_mac","Action","User_mac","User_mac_head","Bssid","WiFi","Time","Date"])
    i=0
    with open(r'D:\UploadPortalData\uploadPortal (5).csv',encoding='UTF-8') as csvfile:
        readCSV=csv.reader(csvfile,delimiter=',')
        for row in readCSV:
            if(len(row)!=8):
                continue
            row1=[]
            i+=1
            row1.append(row[0].replace(':','')[-5:])
                
            if row[2]=='auth':
                row1.append('1')
            elif row[2]=='deauth':
                row1.append('2')
            elif row[2]=='portal':
                row1.append('3')
            elif row[2]=='portalauth':
                row1.append('4')
 
            row1.append(str(row[3].replace(':','')))
            row1.append(str(row[3].replace(':','')[0:6]))
 
            if row[0]==row[4]:
                row1.append('2')
            else:
                row1.append('5')
 
            if 'City-WiFi-5G' in row[5]:
                row1.append('2')
            elif 'City-WiFi' in row[5]:
                row1.append('1')
            else:
                row1.append('0')
        
            row1.append(float(row[6])/86400.0-2.0/3.0+719530.0)
            
            row1.append(row[7])
                
            writer.writerow(row1)
 
print('Done')
print(i)
 
import pandas as pd
df=pd.read_csv(r'C:\Users\Hanju\Desktop\uploadPortal(5).csv')
#print(df.head())
#print(df.tail())
print(df.shape)
New_df=df.drop_duplicates(['Action','User_mac','Time'])
print(New_df.shape)
#print(New_df.head())
#print(New_df.tail())
New_df.to_csv(r'C:\Users\Hanju\Desktop\uploadPortal(5)_Final.csv')
print('Done')

參考3:

解決的方法:
1)使用SSCursor(流式遊標),避免客戶端佔用大量記憶體。(這個cursor實際上沒有快取下來任何資料,它不會讀取所有所有到記憶體中,它的做法是從儲存塊中讀取記錄,並且一條一條返回給你。)
2)使用迭代器而不用fetchall,即省記憶體又能很快拿到資料。

import MySQLdb.cursors
 
conn = MySQLdb.connect(host='ip地址', user='使用者名稱', passwd='密碼', db='資料庫名', port=3306,
                       charset='utf8', cursorclass = MySQLdb.cursors.SSCursor)
cur = conn.cursor()
cur.execute("SELECT * FROM bigtable");
row = cur.fetchone()
while row is not None:
    do something
    row = cur.fetchone()
 
cur.close()
conn.close()

需要注意的是,
1.因為SSCursor是沒有快取的遊標,結果集只要沒取完,這個conn是不能再處理別的sql,包括另外生成一個cursor也不行的。
如果需要幹別的,請另外再生成一個連線物件。
2. 每次讀取後處理資料要快,不能超過60s,否則mysql將會斷開這次連線,也可以修改 SET NET_WRITE_TIMEOUT = xx 來增加超時間隔。